You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
154 lines
4.9 KiB
154 lines
4.9 KiB
import os
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_IMPORT_ERRORS: dict[str, str] = {}
|
|
|
|
|
|
def _try_import_pdf() -> bool:
|
|
global _IMPORT_ERRORS
|
|
if "pdf" in _IMPORT_ERRORS:
|
|
return False
|
|
try:
|
|
from PyPDF2 import PdfReader
|
|
return True
|
|
except ImportError:
|
|
_IMPORT_ERRORS["pdf"] = "PyPDF2 未安装,无法解析 PDF"
|
|
return False
|
|
|
|
|
|
def _try_import_docx() -> bool:
|
|
global _IMPORT_ERRORS
|
|
if "docx" in _IMPORT_ERRORS:
|
|
return False
|
|
try:
|
|
from docx import Document
|
|
return True
|
|
except ImportError:
|
|
_IMPORT_ERRORS["docx"] = "python-docx 未安装,无法解析 Word 文档"
|
|
return False
|
|
|
|
|
|
def _try_import_excel() -> bool:
|
|
global _IMPORT_ERRORS
|
|
if "excel" in _IMPORT_ERRORS:
|
|
return False
|
|
try:
|
|
import openpyxl
|
|
return True
|
|
except ImportError:
|
|
_IMPORT_ERRORS["excel"] = "openpyxl 未安装,无法解析 Excel 文档"
|
|
return False
|
|
|
|
|
|
def parse_document(file_path: str, file_type: str = "auto") -> str:
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
|
|
if file_type == "auto":
|
|
if ext in (".pdf",):
|
|
file_type = "pdf"
|
|
elif ext in (".docx", ".doc"):
|
|
file_type = "word"
|
|
elif ext in (".xlsx", ".xls"):
|
|
file_type = "excel"
|
|
elif ext in (".pptx", ".ppt"):
|
|
file_type = "ppt"
|
|
else:
|
|
file_type = "text"
|
|
|
|
if file_type == "pdf":
|
|
if not _try_import_pdf():
|
|
return _IMPORT_ERRORS["pdf"]
|
|
from PyPDF2 import PdfReader
|
|
|
|
try:
|
|
reader = PdfReader(file_path)
|
|
texts = []
|
|
for page in reader.pages:
|
|
t = page.extract_text()
|
|
if t:
|
|
texts.append(t)
|
|
return "\n".join(texts) if texts else "(PDF 无可提取的文本内容)"
|
|
except Exception as e:
|
|
logger.error(f"PDF 解析失败: {e}")
|
|
return f"PDF 解析失败: {e}"
|
|
|
|
if file_type == "word":
|
|
if not _try_import_docx():
|
|
return _IMPORT_ERRORS["docx"]
|
|
from docx import Document
|
|
|
|
try:
|
|
doc = Document(file_path)
|
|
texts = [p.text for p in doc.paragraphs if p.text.strip()]
|
|
tables_text = []
|
|
for table in doc.tables:
|
|
for row in table.rows:
|
|
row_text = " | ".join(cell.text for cell in row.cells)
|
|
tables_text.append(row_text)
|
|
result = "\n".join(texts)
|
|
if tables_text:
|
|
result += "\n\n--- 表格内容 ---\n" + "\n".join(tables_text)
|
|
return result or "(Word 文档无可提取的文本内容)"
|
|
except Exception as e:
|
|
logger.error(f"Word 解析失败: {e}")
|
|
return f"Word 解析失败: {e}"
|
|
|
|
if file_type == "excel":
|
|
if not _try_import_excel():
|
|
return _IMPORT_ERRORS["excel"]
|
|
import openpyxl
|
|
|
|
try:
|
|
wb = openpyxl.load_workbook(file_path, data_only=True)
|
|
result_parts = []
|
|
for sheet_name in wb.sheetnames:
|
|
ws = wb[sheet_name]
|
|
result_parts.append(f"=== 工作表: {sheet_name} ===")
|
|
for row in ws.iter_rows(values_only=True):
|
|
row_text = " | ".join(str(c) if c is not None else "" for c in row)
|
|
if row_text.strip(" |"):
|
|
result_parts.append(row_text)
|
|
return "\n".join(result_parts) if result_parts else "(Excel 无可提取的表格内容)"
|
|
except Exception as e:
|
|
logger.error(f"Excel 解析失败: {e}")
|
|
return f"Excel 解析失败: {e}"
|
|
|
|
if file_type in ("ppt", "pptx"):
|
|
return "PPT 解析暂不支持,请将内容复制到 Word 或 PDF 后重试。"
|
|
|
|
try:
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
except UnicodeDecodeError:
|
|
try:
|
|
with open(file_path, "r", encoding="gbk") as f:
|
|
return f.read()
|
|
except Exception:
|
|
return f"无法以文本方式读取文件: {file_path}"
|
|
except FileNotFoundError:
|
|
return f"文件不存在: {file_path}"
|
|
except Exception as e:
|
|
logger.error(f"文档读取失败: {e}")
|
|
return f"文档读取失败: {e}"
|
|
|
|
|
|
def format_correction(content: str, format_rules: str = "standard") -> str:
|
|
parts = []
|
|
parts.append(f"[格式规则: {format_rules}]\n")
|
|
|
|
if format_rules == "standard" or format_rules == "enterprise":
|
|
for line in content.split("\n"):
|
|
stripped = line.strip()
|
|
if stripped:
|
|
parts.append(stripped)
|
|
|
|
if format_rules == "enterprise":
|
|
parts.insert(1, f"[发文机关] 企业AI平台")
|
|
parts.insert(2, f"[密级] 内部")
|
|
|
|
return "\n".join(parts)
|
|
|
|
|
|
__all__ = ["parse_document", "format_correction"]
|