You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
198 lines
6.4 KiB
198 lines
6.4 KiB
"""文档处理工具模块。
|
|
|
|
提供多种办公文档格式的解析和格式修正功能,支持 PDF、Word、Excel 等格式。
|
|
采用延迟导入策略,仅在需要时才尝试加载相应的依赖库。
|
|
"""
|
|
import os
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__) # 当前模块的日志记录器
|
|
|
|
_IMPORT_ERRORS: dict[str, str] = {} # 记录各库的导入错误信息,避免重复尝试
|
|
|
|
|
|
def _try_import_pdf() -> bool:
|
|
"""尝试导入 PDF 解析库 PyPDF2。
|
|
|
|
Returns:
|
|
bool: 导入成功返回 True,失败返回 False。
|
|
"""
|
|
global _IMPORT_ERRORS
|
|
if "pdf" in _IMPORT_ERRORS:
|
|
return False
|
|
try:
|
|
from PyPDF2 import PdfReader
|
|
return True
|
|
except ImportError:
|
|
_IMPORT_ERRORS["pdf"] = "PyPDF2 未安装,无法解析 PDF"
|
|
return False
|
|
|
|
|
|
def _try_import_docx() -> bool:
|
|
"""尝试导入 Word 文档解析库 python-docx。
|
|
|
|
Returns:
|
|
bool: 导入成功返回 True,失败返回 False。
|
|
"""
|
|
global _IMPORT_ERRORS
|
|
if "docx" in _IMPORT_ERRORS:
|
|
return False
|
|
try:
|
|
from docx import Document
|
|
return True
|
|
except ImportError:
|
|
_IMPORT_ERRORS["docx"] = "python-docx 未安装,无法解析 Word 文档"
|
|
return False
|
|
|
|
|
|
def _try_import_excel() -> bool:
|
|
"""尝试导入 Excel 表格解析库 openpyxl。
|
|
|
|
Returns:
|
|
bool: 导入成功返回 True,失败返回 False。
|
|
"""
|
|
global _IMPORT_ERRORS
|
|
if "excel" in _IMPORT_ERRORS:
|
|
return False
|
|
try:
|
|
import openpyxl
|
|
return True
|
|
except ImportError:
|
|
_IMPORT_ERRORS["excel"] = "openpyxl 未安装,无法解析 Excel 文档"
|
|
return False
|
|
|
|
|
|
def parse_document(file_path: str, file_type: str = "auto") -> str:
|
|
"""解析各类办公文档,提取文本内容。
|
|
|
|
自动根据文件扩展名识别文档类型,支持 PDF、Word、Excel、PPT 和纯文本。
|
|
|
|
Args:
|
|
file_path: 文档文件的完整路径。
|
|
file_type: 文档类型,auto 表示自动识别。
|
|
|
|
Returns:
|
|
str: 提取的文档文本内容或错误信息。
|
|
"""
|
|
ext = os.path.splitext(file_path)[1].lower() # 获取文件扩展名
|
|
|
|
# 根据扩展名自动识别文件类型
|
|
if file_type == "auto":
|
|
if ext in (".pdf",):
|
|
file_type = "pdf"
|
|
elif ext in (".docx", ".doc"):
|
|
file_type = "word"
|
|
elif ext in (".xlsx", ".xls"):
|
|
file_type = "excel"
|
|
elif ext in (".pptx", ".ppt"):
|
|
file_type = "ppt"
|
|
else:
|
|
file_type = "text"
|
|
|
|
if file_type == "pdf":
|
|
if not _try_import_pdf():
|
|
return _IMPORT_ERRORS["pdf"]
|
|
from PyPDF2 import PdfReader
|
|
|
|
try:
|
|
reader = PdfReader(file_path)
|
|
texts = []
|
|
for page in reader.pages:
|
|
t = page.extract_text()
|
|
if t:
|
|
texts.append(t)
|
|
return "\n".join(texts) if texts else "(PDF 无可提取的文本内容)"
|
|
except Exception as e:
|
|
logger.error(f"PDF 解析失败: {e}")
|
|
return f"PDF 解析失败: {e}"
|
|
|
|
if file_type == "word":
|
|
if not _try_import_docx():
|
|
return _IMPORT_ERRORS["docx"]
|
|
from docx import Document
|
|
|
|
try:
|
|
doc = Document(file_path)
|
|
texts = [p.text for p in doc.paragraphs if p.text.strip()]
|
|
tables_text = []
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
row_text = " | ".join(cell.text for cell in row.cells)
|
|
tables_text.append(row_text)
|
|
result = "\n".join(texts)
|
|
if tables_text:
|
|
result += "\n\n--- 表格内容 ---\n" + "\n".join(tables_text)
|
|
return result or "(Word 文档无可提取的文本内容)"
|
|
except Exception as e:
|
|
logger.error(f"Word 解析失败: {e}")
|
|
return f"Word 解析失败: {e}"
|
|
|
|
if file_type == "excel":
|
|
if not _try_import_excel():
|
|
return _IMPORT_ERRORS["excel"]
|
|
import openpyxl
|
|
|
|
try:
|
|
wb = openpyxl.load_workbook(file_path, data_only=True) # data_only 获取计算后的值
|
|
result_parts = []
|
|
for sheet_name in wb.sheetnames:
|
|
ws = wb[sheet_name]
|
|
result_parts.append(f"=== 工作表: {sheet_name} ===")
|
|
for row in ws.iter_rows(values_only=True):
|
|
row_text = " | ".join(str(c) if c is not None else "" for c in row)
|
|
if row_text.strip(" |"):
|
|
result_parts.append(row_text)
|
|
return "\n".join(result_parts) if result_parts else "(Excel 无可提取的表格内容)"
|
|
except Exception as e:
|
|
logger.error(f"Excel 解析失败: {e}")
|
|
return f"Excel 解析失败: {e}"
|
|
|
|
if file_type in ("ppt", "pptx"):
|
|
return "PPT 解析暂不支持,请将内容复制到 Word 或 PDF 后重试。"
|
|
|
|
# 尝试以纯文本方式读取文件
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
except UnicodeDecodeError:
|
|
try:
|
|
with open(file_path, "r", encoding="gbk") as f:
|
|
return f.read()
|
|
except Exception:
|
|
return f"无法以文本方式读取文件: {file_path}"
|
|
except FileNotFoundError:
|
|
return f"文件不存在: {file_path}"
|
|
except Exception as e:
|
|
logger.error(f"文档读取失败: {e}")
|
|
return f"文档读取失败: {e}"
|
|
|
|
|
|
def format_correction(content: str, format_rules: str = "standard") -> str:
|
|
"""对文档内容进行格式修正。
|
|
|
|
根据指定的格式规则对文本进行标准化处理,支持标准和企业公文两种模式。
|
|
|
|
Args:
|
|
content: 待修正的原始文本内容。
|
|
format_rules: 格式规则,standard 为标准模式,enterprise 为企业公文模式。
|
|
|
|
Returns:
|
|
str: 格式修正后的文本内容。
|
|
"""
|
|
parts = []
|
|
parts.append(f"[格式规则: {format_rules}]\n")
|
|
|
|
if format_rules == "standard" or format_rules == "enterprise":
|
|
for line in content.split("\n"):
|
|
stripped = line.strip()
|
|
if stripped:
|
|
parts.append(stripped)
|
|
|
|
if format_rules == "enterprise":
|
|
parts.insert(1, f"[发文机关] 企业AI平台")
|
|
parts.insert(2, f"[密级] 内部")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
__all__ = ["parse_document", "format_correction"]
|
|
|