You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
198 lines
7.3 KiB
198 lines
7.3 KiB
"""知识库模块。
|
|
|
|
提供基于 AgentScope 的企业知识库管理功能,包括文档索引、文本索引和语义检索。
|
|
支持多种文档格式(PDF、Word、Excel、纯文本)的自动解析和向量化存储。
|
|
使用 Qdrant 作为向量存储后端,OpenAI Embedding 作为向量化模型。
|
|
"""
|
|
import os
|
|
import asyncio
|
|
import logging
|
|
from agentscope.embedding import OpenAITextEmbedding
|
|
from agentscope.rag import SimpleKnowledge, QdrantStore, TextReader, PDFReader, WordReader, ExcelReader
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__) # 当前模块的日志记录器
|
|
|
|
_knowledge_base: SimpleKnowledge | None = None # 全局知识库单例实例
|
|
_STORE_PATH = os.path.join(settings.UPLOAD_DIR, "..", "data", "qdrant") # Qdrant 向量存储路径
|
|
_COLLECTION_NAME = "enterprise_knowledge" # Qdrant 集合名称
|
|
_VECTOR_DIM = 1536 # 向量维度(text-embedding-3-small 标准维度)
|
|
|
|
|
|
def _get_embedding_model():
|
|
"""创建并返回 OpenAI 文本 Embedding 模型实例。
|
|
|
|
Returns:
|
|
OpenAITextEmbedding: 配置好的 Embedding 模型。
|
|
"""
|
|
return OpenAITextEmbedding(
|
|
api_key=settings.LLM_API_KEY,
|
|
model_name="text-embedding-3-small",
|
|
dimensions=_VECTOR_DIM,
|
|
)
|
|
|
|
|
|
def get_knowledge_base() -> SimpleKnowledge:
|
|
"""获取或创建全局知识库实例。
|
|
|
|
采用单例模式,首次调用时初始化 Qdrant 向量存储和 Embedding 模型,
|
|
后续调用直接返回已创建的实例。
|
|
|
|
Returns:
|
|
SimpleKnowledge: 初始化好的知识库实例。
|
|
"""
|
|
global _knowledge_base
|
|
if _knowledge_base is None:
|
|
os.makedirs(_STORE_PATH, exist_ok=True)
|
|
store = QdrantStore(
|
|
location=_STORE_PATH,
|
|
collection_name=_COLLECTION_NAME,
|
|
dimensions=_VECTOR_DIM,
|
|
)
|
|
_knowledge_base = SimpleKnowledge(
|
|
embedding_store=store,
|
|
embedding_model=_get_embedding_model(),
|
|
)
|
|
logger.info(f"知识库已初始化: {_STORE_PATH}")
|
|
return _knowledge_base
|
|
|
|
|
|
async def add_document(file_path: str, file_type: str = "auto") -> str:
|
|
"""将文档文件添加到知识库中进行索引。
|
|
|
|
自动根据文件类型选择合适的解析器,将文档切分为多个文本块后
|
|
进行向量化并存储到知识库中。
|
|
|
|
Args:
|
|
file_path: 文档文件的完整路径。
|
|
file_type: 文档类型,auto 表示自动识别。
|
|
|
|
Returns:
|
|
str: 索引结果描述或错误信息。
|
|
"""
|
|
try:
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
kb = get_knowledge_base()
|
|
|
|
# 根据文件类型选择对应的解析器
|
|
if file_type == "auto":
|
|
if ext == ".pdf":
|
|
reader = PDFReader(chunk_size=1024, split_by="sentence")
|
|
documents = await reader(pdf_path=file_path)
|
|
elif ext in (".docx", ".doc"):
|
|
reader = WordReader(chunk_size=1024)
|
|
documents = await reader(file_path=file_path)
|
|
elif ext in (".xlsx", ".xls"):
|
|
reader = ExcelReader(chunk_size=1024)
|
|
documents = await reader(file_path=file_path)
|
|
else:
|
|
reader = TextReader(chunk_size=1024, split_by="sentence")
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
documents = await reader(text=content)
|
|
else:
|
|
if file_type == "pdf":
|
|
reader = PDFReader(chunk_size=1024, split_by="sentence")
|
|
documents = await reader(pdf_path=file_path)
|
|
elif file_type == "word":
|
|
reader = WordReader(chunk_size=1024)
|
|
documents = await reader(file_path=file_path)
|
|
elif file_type == "excel":
|
|
reader = ExcelReader(chunk_size=1024)
|
|
documents = await reader(file_path=file_path)
|
|
else:
|
|
reader = TextReader(chunk_size=1024)
|
|
with open(file_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
documents = await reader(text=content)
|
|
|
|
await kb.add_documents(documents)
|
|
filenames = set(d.metadata.file_path for d in documents)
|
|
return f"成功索引 {len(documents)} 个文档块 (来自 {len(filenames)} 个文件)"
|
|
except Exception as e:
|
|
logger.error(f"文档索引失败: {e}")
|
|
return f"文档索引失败: {e}"
|
|
|
|
|
|
async def add_text(text: str, source: str = "manual") -> str:
|
|
"""将纯文本内容添加到知识库中进行索引。
|
|
|
|
Args:
|
|
text: 要索引的文本内容。
|
|
source: 文本来源标识,默认为 manual(手动录入)。
|
|
|
|
Returns:
|
|
str: 索引结果描述或错误信息。
|
|
"""
|
|
try:
|
|
kb = get_knowledge_base()
|
|
reader = TextReader(chunk_size=1024, split_by="sentence")
|
|
documents = await reader(text=text)
|
|
for doc in documents:
|
|
doc.metadata.source = source
|
|
await kb.add_documents(documents)
|
|
return f"成功索引 {len(documents)} 个文档块"
|
|
except Exception as e:
|
|
logger.error(f"文本索引失败: {e}")
|
|
return f"文本索引失败: {e}"
|
|
|
|
|
|
async def search(query: str, limit: int = 5, score_threshold: float = 0.3) -> list[dict]:
|
|
"""在知识库中执行语义检索。
|
|
|
|
根据查询文本的向量相似度,从知识库中检索最相关的文档片段。
|
|
|
|
Args:
|
|
query: 查询文本。
|
|
limit: 返回结果的最大数量,默认 5 条。
|
|
score_threshold: 相似度分数阈值,低于此值的结果将被过滤,默认 0.3。
|
|
|
|
Returns:
|
|
list[dict]: 检索结果列表,每项包含 id、content、score、source 字段。
|
|
"""
|
|
try:
|
|
kb = get_knowledge_base()
|
|
if not kb or not hasattr(kb, 'retrieve'):
|
|
logger.warning("知识库未初始化或不可用")
|
|
return []
|
|
docs = await asyncio.wait_for(
|
|
kb.retrieve(query=query, limit=limit, score_threshold=score_threshold),
|
|
timeout=10.0
|
|
)
|
|
return [
|
|
{
|
|
"id": doc.id,
|
|
"content": doc.metadata.content.get("text", "")[:500],
|
|
"score": round(doc.score, 4) if doc.score else 0,
|
|
"source": doc.metadata.source or doc.metadata.file_path or "",
|
|
}
|
|
for doc in docs
|
|
]
|
|
except asyncio.TimeoutError:
|
|
logger.warning(f"知识检索超时 (query={query[:50]})")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"知识检索失败: {e}")
|
|
return []
|
|
|
|
|
|
async def retrieve_for_agent(query: str, limit: int = 5) -> str:
|
|
"""为 AI 智能体执行知识库检索并返回格式化的结果文本。
|
|
|
|
该函数专为 AgentScope 智能体调用设计,返回人类可读的检索结果。
|
|
|
|
Args:
|
|
query: 查询文本。
|
|
limit: 返回结果的最大数量,默认 5 条。
|
|
|
|
Returns:
|
|
str: 格式化的检索结果文本,包含相关度分数。
|
|
"""
|
|
results = await search(query, limit=limit)
|
|
if not results:
|
|
return "未找到相关文档。"
|
|
|
|
parts = ["根据知识库检索到以下相关内容:"]
|
|
for i, r in enumerate(results, 1):
|
|
parts.append(f"\n[{i}] (相关度: {r['score']})\n{r['content']}")
|
|
return "\n".join(parts)
|
|
|