You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

198 lines
7.3 KiB

"""知识库模块。
提供基于 AgentScope 的企业知识库管理功能,包括文档索引、文本索引和语义检索。
支持多种文档格式(PDF、Word、Excel、纯文本)的自动解析和向量化存储。
使用 Qdrant 作为向量存储后端,OpenAI Embedding 作为向量化模型。
"""
import os
import asyncio
import logging
from agentscope.embedding import OpenAITextEmbedding
from agentscope.rag import SimpleKnowledge, QdrantStore, TextReader, PDFReader, WordReader, ExcelReader
from config import settings
logger = logging.getLogger(__name__) # 当前模块的日志记录器
_knowledge_base: SimpleKnowledge | None = None # 全局知识库单例实例
_STORE_PATH = os.path.join(settings.UPLOAD_DIR, "..", "data", "qdrant") # Qdrant 向量存储路径
_COLLECTION_NAME = "enterprise_knowledge" # Qdrant 集合名称
_VECTOR_DIM = 1536 # 向量维度(text-embedding-3-small 标准维度)
def _get_embedding_model():
"""创建并返回 OpenAI 文本 Embedding 模型实例。
Returns:
OpenAITextEmbedding: 配置好的 Embedding 模型。
"""
return OpenAITextEmbedding(
api_key=settings.LLM_API_KEY,
model_name="text-embedding-3-small",
dimensions=_VECTOR_DIM,
)
def get_knowledge_base() -> SimpleKnowledge:
"""获取或创建全局知识库实例。
采用单例模式,首次调用时初始化 Qdrant 向量存储和 Embedding 模型,
后续调用直接返回已创建的实例。
Returns:
SimpleKnowledge: 初始化好的知识库实例。
"""
global _knowledge_base
if _knowledge_base is None:
os.makedirs(_STORE_PATH, exist_ok=True)
store = QdrantStore(
location=_STORE_PATH,
collection_name=_COLLECTION_NAME,
dimensions=_VECTOR_DIM,
)
_knowledge_base = SimpleKnowledge(
embedding_store=store,
embedding_model=_get_embedding_model(),
)
logger.info(f"知识库已初始化: {_STORE_PATH}")
return _knowledge_base
async def add_document(file_path: str, file_type: str = "auto") -> str:
"""将文档文件添加到知识库中进行索引。
自动根据文件类型选择合适的解析器,将文档切分为多个文本块后
进行向量化并存储到知识库中。
Args:
file_path: 文档文件的完整路径。
file_type: 文档类型,auto 表示自动识别。
Returns:
str: 索引结果描述或错误信息。
"""
try:
ext = os.path.splitext(file_path)[1].lower()
kb = get_knowledge_base()
# 根据文件类型选择对应的解析器
if file_type == "auto":
if ext == ".pdf":
reader = PDFReader(chunk_size=1024, split_by="sentence")
documents = await reader(pdf_path=file_path)
elif ext in (".docx", ".doc"):
reader = WordReader(chunk_size=1024)
documents = await reader(file_path=file_path)
elif ext in (".xlsx", ".xls"):
reader = ExcelReader(chunk_size=1024)
documents = await reader(file_path=file_path)
else:
reader = TextReader(chunk_size=1024, split_by="sentence")
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
documents = await reader(text=content)
else:
if file_type == "pdf":
reader = PDFReader(chunk_size=1024, split_by="sentence")
documents = await reader(pdf_path=file_path)
elif file_type == "word":
reader = WordReader(chunk_size=1024)
documents = await reader(file_path=file_path)
elif file_type == "excel":
reader = ExcelReader(chunk_size=1024)
documents = await reader(file_path=file_path)
else:
reader = TextReader(chunk_size=1024)
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
documents = await reader(text=content)
await kb.add_documents(documents)
filenames = set(d.metadata.file_path for d in documents)
return f"成功索引 {len(documents)} 个文档块 (来自 {len(filenames)} 个文件)"
except Exception as e:
logger.error(f"文档索引失败: {e}")
return f"文档索引失败: {e}"
async def add_text(text: str, source: str = "manual") -> str:
"""将纯文本内容添加到知识库中进行索引。
Args:
text: 要索引的文本内容。
source: 文本来源标识,默认为 manual(手动录入)。
Returns:
str: 索引结果描述或错误信息。
"""
try:
kb = get_knowledge_base()
reader = TextReader(chunk_size=1024, split_by="sentence")
documents = await reader(text=text)
for doc in documents:
doc.metadata.source = source
await kb.add_documents(documents)
return f"成功索引 {len(documents)} 个文档块"
except Exception as e:
logger.error(f"文本索引失败: {e}")
return f"文本索引失败: {e}"
async def search(query: str, limit: int = 5, score_threshold: float = 0.3) -> list[dict]:
"""在知识库中执行语义检索。
根据查询文本的向量相似度,从知识库中检索最相关的文档片段。
Args:
query: 查询文本。
limit: 返回结果的最大数量,默认 5 条。
score_threshold: 相似度分数阈值,低于此值的结果将被过滤,默认 0.3。
Returns:
list[dict]: 检索结果列表,每项包含 id、content、score、source 字段。
"""
try:
kb = get_knowledge_base()
if not kb or not hasattr(kb, 'retrieve'):
logger.warning("知识库未初始化或不可用")
return []
docs = await asyncio.wait_for(
kb.retrieve(query=query, limit=limit, score_threshold=score_threshold),
timeout=10.0
)
return [
{
"id": doc.id,
"content": doc.metadata.content.get("text", "")[:500],
"score": round(doc.score, 4) if doc.score else 0,
"source": doc.metadata.source or doc.metadata.file_path or "",
}
for doc in docs
]
except asyncio.TimeoutError:
logger.warning(f"知识检索超时 (query={query[:50]})")
return []
except Exception as e:
logger.error(f"知识检索失败: {e}")
return []
async def retrieve_for_agent(query: str, limit: int = 5) -> str:
"""为 AI 智能体执行知识库检索并返回格式化的结果文本。
该函数专为 AgentScope 智能体调用设计,返回人类可读的检索结果。
Args:
query: 查询文本。
limit: 返回结果的最大数量,默认 5 条。
Returns:
str: 格式化的检索结果文本,包含相关度分数。
"""
results = await search(query, limit=limit)
if not results:
return "未找到相关文档。"
parts = ["根据知识库检索到以下相关内容:"]
for i, r in enumerate(results, 1):
parts.append(f"\n[{i}] (相关度: {r['score']})\n{r['content']}")
return "\n".join(parts)