From 490dced96d93bedb2c7a6bfeab2f475f2f2534a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?MSI-7950X=5C=E5=88=98=E6=B3=BD=E6=98=8E?= Date: Fri, 15 May 2026 16:12:38 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=B0=E5=BF=86=E7=AE=A1=E7=90=86=E6=9B=B4?= =?UTF-8?q?=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- PLAN8.md | 464 ++++++++++++++++++ backend/main.py | 12 +- backend/modules/chat/router.py | 5 +- backend/modules/flow_engine/engine.py | 120 ++++- backend/modules/flow_engine/router.py | 180 +++++-- backend/modules/memory/__init__.py | 4 + backend/modules/memory/manager.py | 201 ++++++++ backend/modules/memory/router.py | 30 ++ backend/modules/memory/schemas.py | 19 + backend/modules/rag/router.py | 120 ++++- frontend/src/api/index.ts | 3 + .../src/components/layout/AdminLayout.vue | 52 +- frontend/src/components/layout/MainLayout.vue | 78 +-- frontend/src/router/index.ts | 132 +++-- frontend/src/views/chat/FlowChat.vue | 68 +-- frontend/src/views/dashboard/Dashboard.vue | 4 +- frontend/src/views/monitor/EmployeeList.vue | 4 +- frontend/src/views/rag/KnowledgeBase.vue | 227 +++++++-- 18 files changed, 1382 insertions(+), 341 deletions(-) create mode 100644 PLAN8.md create mode 100644 backend/modules/memory/__init__.py create mode 100644 backend/modules/memory/manager.py create mode 100644 backend/modules/memory/router.py create mode 100644 backend/modules/memory/schemas.py diff --git a/PLAN8.md b/PLAN8.md new file mode 100644 index 0000000..214406f --- /dev/null +++ b/PLAN8.md @@ -0,0 +1,464 @@ +# PLAN8 — 记忆管理模块 & 用户画像系统 + +## 一、需求背景 + +### 1.1 核心需求对照(README L11-L16) + +| # | 需求 | 记忆/画像关联 | +|---|------|--------------| +| 1 | 用户和AI沟通工作,有现成流程给用户选择使用 | **用户-工作流对话记忆**:用户选了一个流程对话,下次回来继续聊,AI记得之前聊了什么 | +| 2 | 上司查看下属工作情况,AI综合评估 | **用户画像+行为轨迹**:积累下属的工作数据,AI才能做综合评估 | +| 3 | 上司通过AI指派任务给下属 | **跨用户上下文**:上司和AI说"给小明派个任务",AI需要知道小明是谁 | +| 4 | 流程由后台无代码搭建 | **无需在流中加记忆节点**:记忆是通用中间件,对所有流透明生效 | +| 5 | MCP服务获取/更新系统数据 | **外部数据注入记忆**:MCP返回的系统数据可选择性注入记忆上下文 | +| 6 | 知识库提升模型认知 | **知识+记忆融合检索**:RAG检索结果与历史对话记忆合并排序 | + +### 1.2 当前记忆现状与缺陷 + +``` +FlowSessionMemory (engine.py:15) + self._messages: list[dict] = [] ← 纯 Python list + 生命周期 = 单次 HTTP 请求 + 请求结束 → 内存释放 → 记忆全丢 + +ChatSession / ChatMessage (models/__init__.py:92-113) + 已有数据库表!但只被 chat/router.py 基本对话使用 + FlowEngine.execute() 完全不碰这两个表 + +UserIsolatedMemory (agentscope_integration/memory/) + 基于 agentscope MemoryBase + 但只在 AgentFactory.create_agent() 中为 agent 聊天使用 + 流执行完全不经过它 + +断链点: + router.py context 里没放 session_id + → FlowEngine 每次都生成新 uuid + → FlowSessionMemory 永远从空列表开始 + → 无任何持久化写回逻辑 +``` + +--- + +## 二、架构设计 + +### 2.1 核心设计原则(修改点) + +1. **纯 Redis 记忆存储**:对话记忆用 Redis String + List + Hash 组合存储,Redis 开启 `--appendonly yes` AOF 持久化保证不失忆。速度优先,毫秒级读写 +2. **透明通用中间件**:记忆不是流中的节点,不需要在流编辑器里配置。对所有已发布流自动生效,是 flow_engine 层面的通用钩子 +3. **用户画像存 PostgreSQL**:画像需要复杂聚合查询和结构化字段,用 PG 合适;不要求实时性,每次对话结束异步更新即可 +4. **用户-工作流隔离**:记忆以 `(user_id, flow_id, session_id)` 为最小隔离单元 +5. **独立模块**:MemoryManager 是独立服务,FlowEngine、AgentChat 统一调用 + +### 2.2 中间件触发位置(关键设计) + +``` +所有流的执行路径: + +router.py: execute_flow() / execute_flow_stream() + │ + ├── ❶ 执行前:MemoryManager.inject_memory(user_id, flow_id, session_id) + │ → 从 Redis 检索该用户-流组合的历史对话 + │ → 注入 context["_memory_context"] + │ + ├── FlowEngine.execute() 执行各节点 + │ └── LLMNodeAgent.reply() + │ ├── 从 context["_memory_context"] 获取历史消息 + │ ├── 拼入 messages 数组发给 LLM + │ └── 返回结果 + │ + ├── ❷ 执行后:MemoryManager.record_exchange(user_id, flow_id, session_id, user_msg, assistant_msg) + │ → 写入 Redis(异步,不阻塞响应) + │ + └── 返回响应给前端 +``` + +**用户无感知**:不需要在流编辑器中加任何东西,所有流自动获得记忆能力。 + +--- + +## 三、Redis 键设计 + +### 3.1 键命名空间 + +``` +# 会话消息列表(LPUSH + LTRIM 控制长度) +mem:{user_id}:{flow_id}:{session_id}:messages + → Redis List,每条是 JSON: {"role":"user","content":"...","ts":"..."} + → LTRIM 控制在 max_history * 2 条(每轮对话 user+assistant 两条) + +# 会话元数据 +mem:{user_id}:{flow_id}:{session_id}:meta + → Redis Hash + → {flow_name, created_at, last_active_at, message_count} + +# 用户的会话索引(SMEMBERS 获取该用户所有 session) +mem:{user_id}:sessions + → Redis Set,存 session_id 列表 + +# 会话摘要(超过阈值时生成,独立 Key) +mem:{user_id}:{flow_id}:{session_id}:summary + → Redis String,LLM 生成的摘要文本 + +# TTL 策略 +mem:{user_id}:{flow_id}:{session_id}:messages → 7 天(EXPIRE 604800) +mem:{user_id}:{flow_id}:{session_id}:meta → 7 天 +mem:{user_id}:sessions → 30 天 +mem:{user_id}:{flow_id}:{session_id}:summary → 30 天(摘要保留更久) +``` + +### 3.2 内存估算 + +``` +单条消息 ≈ 500 bytes(含 JSON 开销) +每轮对话 ≈ 2 条 = 1KB +30 轮/会话 × 1KB = 30KB/会话 +100 个活跃用户 × 10 个会话/用户 × 30KB = 30MB + +Redis AOF 文件预估:初始 30MB,日常增长可控 +``` + +--- + +## 四、核心代码设计 + +### 4.1 模块位置 + +``` +backend/ +└── modules/ + └── memory/ ← 新建独立模块 + ├── __init__.py → 导出 + ├── manager.py → MemoryManager 核心类 + ├── profile.py → UserProfileEngine(Phase 2) + ├── router.py → 管理 API + └── schemas.py → Pydantic 模型 +``` + +### 4.2 MemoryManager + +```python +# modules/memory/manager.py + +class MemoryManager: + """记忆管理中心 — 纯 Redis 存储""" + + KEY_PREFIX = "mem" + DEFAULT_TTL = 604800 # 7 天 + SESSION_INDEX_TTL = 2592000 # 30 天 + MAX_HISTORY = 40 # 每个会话最多保留 40 条消息(20 轮对话) + + def __init__(self, redis: Redis): + self.redis = redis + + # ===== 核心 API ===== + + async def inject_memory( + self, + user_id: str, + flow_id: str, + session_id: str, + context: dict, + ): + """执行前:将历史记忆注入 context""" + messages = await self._get_recent_messages(user_id, flow_id, session_id) + summary = await self._get_summary(user_id, flow_id, session_id) + + context["_memory_context"] = { + "recent_messages": list(reversed(messages)), + "summary": summary, + "session_id": session_id, + } + + async def record_exchange( + self, + user_id: str, + flow_id: str, + session_id: str, + user_msg: str, + assistant_msg: str, + flow_name: str = "", + ): + """执行后:异步记录本轮对话""" + key = self._msg_key(user_id, flow_id, session_id) + ts = datetime.utcnow().isoformat() + + async with self.redis.pipeline() as pipe: + pipe.lpush(key, + json.dumps({"role": "assistant", "content": assistant_msg, "ts": ts}), + json.dumps({"role": "user", "content": user_msg, "ts": ts}), + ) + pipe.ltrim(key, 0, self.MAX_HISTORY - 1) + pipe.expire(key, self.DEFAULT_TTL) + + pipe.hset(self._meta_key(user_id, flow_id, session_id), mapping={ + "flow_name": flow_name, + "last_active_at": ts, + }) + pipe.expire(self._meta_key(user_id, flow_id, session_id), self.DEFAULT_TTL) + + pipe.sadd(f"{self.KEY_PREFIX}:{user_id}:sessions", session_id) + pipe.expire(f"{self.KEY_PREFIX}:{user_id}:sessions", self.SESSION_INDEX_TTL) + + await pipe.execute() + + # 异步触发摘要检查 + asyncio.create_task(self._maybe_summarize(user_id, flow_id, session_id)) + + async def get_conversation_history( + self, user_id: str, flow_id: str, session_id: str, limit: int = 20 + ) -> list[dict]: + """获取完整历史(管理 API 用)""" + messages = await self._get_recent_messages(user_id, flow_id, session_id, limit) + return list(reversed(messages)) + + async def delete_session(self, user_id: str, session_id: str): + """清除某个会话的所有记忆""" + patterns = await self.redis.keys(f"{self.KEY_PREFIX}:{user_id}:*:{session_id}:*") + async with self.redis.pipeline() as pipe: + if patterns: + pipe.delete(*patterns) + pipe.srem(f"{self.KEY_PREFIX}:{user_id}:sessions", session_id) + await pipe.execute() + + async def list_user_sessions(self, user_id: str) -> list[dict]: + """列出用户所有会话""" + session_ids = await self.redis.smembers(f"{self.KEY_PREFIX}:{user_id}:sessions") + sessions = [] + for sid in session_ids: + keys = await self.redis.keys(f"{self.KEY_PREFIX}:{user_id}:*:{sid}:meta") + for k in keys: + meta = await self.redis.hgetall(k) + parts = k.split(":") + flow_id = parts[2] if len(parts) > 2 else "" + sessions.append({ + "session_id": sid, + "flow_id": flow_id, + "flow_name": meta.get("flow_name", ""), + "last_active_at": meta.get("last_active_at", ""), + }) + return sorted(sessions, key=lambda s: s["last_active_at"], reverse=True) + + # ===== 内部方法 ===== + + async def _get_recent_messages( + self, user_id: str, flow_id: str, session_id: str, limit: int = None + ) -> list[dict]: + limit = limit or self.MAX_HISTORY + key = self._msg_key(user_id, flow_id, session_id) + raw = await self.redis.lrange(key, 0, limit - 1) + return [json.loads(m) for m in raw] + + async def _get_summary(self, user_id: str, flow_id: str, session_id: str) -> str: + key = f"{self.KEY_PREFIX}:{user_id}:{flow_id}:{session_id}:summary" + val = await self.redis.get(key) + return val or "" + + async def _maybe_summarize(self, user_id: str, flow_id: str, session_id: str): + """消息超过阈值时异步生成摘要""" + key = self._msg_key(user_id, flow_id, session_id) + count = await self.redis.llen(key) + if count < 30: # 15 轮以上才触发摘要 + return + + recent = await self._get_recent_messages(user_id, flow_id, session_id, 20) + dialogue = "\n".join(f"{m['role']}: {m['content'][:500]}" for m in recent[:10]) + + try: + import httpx + api_base = settings.LLM_API_BASE.rstrip("/") + resp = await httpx.AsyncClient(timeout=30).post( + f"{api_base}/chat/completions", + json={ + "model": settings.LLM_MODEL, + "messages": [{ + "role": "user", + "content": f"请用一段话简要总结以下对话的关键内容。保留人名、任务、决策、时间等关键信息。\n\n{dialogue}" + }], + "max_tokens": 200, + }, + headers={"Authorization": f"Bearer {settings.LLM_API_KEY}"}, + ) + data = resp.json() + summary = data.get("choices", [{}])[0].get("message", {}).get("content", "") + + if summary: + key = f"{self.KEY_PREFIX}:{user_id}:{flow_id}:{session_id}:summary" + await self.redis.setex(key, 2592000, summary) # 30 天 + except Exception: + pass # 摘要失败不影响主流程 + + @staticmethod + def _msg_key(user_id, flow_id, session_id): + return f"mem:{user_id}:{flow_id}:{session_id}:messages" + + @staticmethod + def _meta_key(user_id, flow_id, session_id): + return f"mem:{user_id}:{flow_id}:{session_id}:meta" +``` + +### 4.3 LLMNodeAgent 透明集成 + +```python +# engine.py LLMNodeAgent.reply() 的变更 + +async def reply(self, msg: Msg, **kwargs) -> Msg: + user_text = msg.get_text_content() + context = kwargs.get("context", {}) + memory_ctx = context.get("_memory_context", {}) + + messages = [{"role": "system", "content": self.system_prompt}] + + # ★ 透明注入记忆(所有流自动生效,无需配置) + if memory_ctx: + summary = memory_ctx.get("summary", "") + recent = memory_ctx.get("recent_messages", []) + if summary: + messages.append({"role": "system", "content": f"[历史对话摘要]\n{summary}"}) + for m in recent[-10:]: # 最近 10 条 + messages.append({"role": m["role"], "content": m["content"]}) + + messages.append({"role": "user", "content": user_text}) + + # 流式/阻塞调用 LLM(已有逻辑) + ... + return Msg(self.name, res_text, "assistant") +``` + +### 4.4 flow_engine/router.py 集成 + +```python +# 统一在 execute_flow() 和 execute_flow_stream() 中调用 + +memory_manager = get_memory_manager() +context = { + "user_id": user_ctx["id"], + "username": user_ctx.get("username", ""), + "session_id": session_id, # ← 从请求 body 传入 + ... +} + +# ★ 执行前:注入记忆 +await memory_manager.inject_memory( + user_id=user_ctx["id"], + flow_id=str(flow_id), + session_id=session_id, + context=context, +) + +engine = FlowEngine(definition) +result = await engine.execute(input_msg, context) + +output = result.get_text_content() + +# ★ 执行后:记录本轮对话 +asyncio.create_task(memory_manager.record_exchange( + user_id=user_ctx["id"], + flow_id=str(flow_id), + session_id=session_id, + user_msg=input_text, + assistant_msg=output, + flow_name=f.name, +)) + +return {"code": 200, "data": {"output": output, "session_id": session_id}} +``` + +--- + +## 五、数据库设计(仅用户画像) + +### 5.1 `user_profiles` 表(PostgreSQL) + +```python +class UserProfile(Base): + __tablename__ = "user_profiles" + user_id = UUID, FK→users.id, PK + display_name = String(100) + department_name = String(100) + position = String(100) + role_tags = JSON, default=[] + total_conversations = Integer, default=0 + total_workflows_used = Integer, default=0 + favorite_workflows = JSON, default=[] # [{flow_id, flow_name, count}] + preferred_work_hours = JSON, default=[] + work_style_tags = JSON, default=[] + common_topics = JSON, default=[] + expertise_areas = JSON, default=[] + total_tasks_assigned = Integer, default=0 + total_tasks_created = Integer, default=0 + avg_task_completion_h = Float, default=0 + task_completion_rate = Float, default=0 + preferred_llm_model = String(50), default="" + preferred_language = String(20), default="zh" + communication_tone = String(50), default="" + profile_version = Integer, default=1 + last_updated_at = DateTime + last_conversation_at = DateTime + created_at = DateTime, default=utcnow +``` + +--- + +## 六、API 端点 + +| 方法 | 路径 | 说明 | +|------|------|------| +| GET | `/api/memory/sessions` | 列出当前用户的记忆会话列表 | +| GET | `/api/memory/sessions/{session_id}` | 查看某个会话的完整对话历史 | +| DELETE | `/api/memory/sessions/{session_id}` | 清除某个会话记忆 | +| GET | `/api/memory/profile` | 查看自己的用户画像 | +| GET | `/api/memory/profile/{user_id}` | 管理者查看下属画像(Phase 2) | +| POST | `/api/memory/profile/report` | 生成下属综合评估报告(Phase 2) | + +--- + +## 七、用户画像 Prompt 注入格式(Phase 2) + +``` +[用户画像] +姓名: 张三 | 职位: 技术主管 | 部门: 研发中心 +工作风格: 注重细节, 快速响应 +擅长领域: 项目管理, 数据分析 +最近话题: 任务管理(15次), 文档处理(8次) +常用流程: 任务分配(23次), 周报生成(12次) +``` + +--- + +## 八、实现阶段 + +### Phase 1 — 基础记忆持久化(本次实施)✅ + +- [ ] 创建 `modules/memory/` 模块目录 +- [ ] 实现 `MemoryManager`(纯 Redis 存储) +- [ ] 在 `flow_engine/router.py` 的 execute_flow 和 execute_flow_stream 中集成(执行前注入 + 执行后记录) +- [ ] 在 `LLMNodeAgent.reply()` 中读取 `_memory_context` +- [ ] 修复 `session_id` 传递链路 +- [ ] 实现 `router.py` API 端点 +- [ ] FlowChat.vue 增加「清空记忆」按钮 +- [ ] 注册路由到 main.py + +### Phase 2 — 用户画像系统(后续) + +- [ ] 新增 `user_profiles` 表 + 自动提取逻辑 +- [ ] 画像注入 LLM context +- [ ] 管理端画像查看 + 下属评估报告 + +### Phase 3 — 记忆检索优化(后续) + +- [ ] 长会话自动摘要压缩 +- [ ] 语义相似度检索历史 +- [ ] Redis 数据过期清理 cron + +--- + +## 九、核心优势 + +| 特性 | 说明 | +|------|------| +| **对用户透明** | 不需要在流编辑器里做任何配置,所有流自动获得记忆 | +| **对开发者透明** | LLMNodeAgent 自动从 context 读记忆,不用改每个节点 | +| **纯 Redis 存储** | 毫秒级读写,AOF 持久化保证不失忆 | +| **用户-工作流隔离** | 不同用户不同流程的记忆互不可见 | +| **7 天自动过期** | 不需要手动清理,旧记忆自动淘汰 | +| **异步记录** | `record_exchange` 用 `asyncio.create_task`,不影响响应速度 | \ No newline at end of file diff --git a/backend/main.py b/backend/main.py index ce905cf..723d341 100644 --- a/backend/main.py +++ b/backend/main.py @@ -18,6 +18,8 @@ from modules.system.router import router as system_router from modules.rag.router import router as rag_router from modules.chat.router import router as chat_router from modules.custom_tool.router import router as custom_tool_router +from modules.memory.router import router as memory_router +from modules.memory.manager import init_memory_manager from websocket_manager import ws_manager from middleware.rbac_middleware import rbac_middleware from middleware.rate_limiter import rate_limit_middleware @@ -28,8 +30,15 @@ from middleware.cache_manager import cache_manager async def lifespan(app: AgentApp): await init_db() await cache_manager.connect() + await init_memory_manager() yield await cache_manager.disconnect() + try: + from modules.memory.manager import get_memory_manager + mm = get_memory_manager() + await mm.redis.close() + except Exception: + pass await async_engine.dispose() @@ -60,4 +69,5 @@ app.include_router(notification_router) app.include_router(system_router) app.include_router(rag_router) app.include_router(chat_router) -app.include_router(custom_tool_router) \ No newline at end of file +app.include_router(custom_tool_router) +app.include_router(memory_router) \ No newline at end of file diff --git a/backend/modules/chat/router.py b/backend/modules/chat/router.py index 864d11f..da71d2a 100644 --- a/backend/modules/chat/router.py +++ b/backend/modules/chat/router.py @@ -41,8 +41,9 @@ async def chat_message( raise HTTPException(404, "流不存在或未发布") definition = flow.definition_json - if flow.published_version_id: - ver_result = await db.execute(select(FlowVersion).where(FlowVersion.id == flow.published_version_id)) + published_version_id = getattr(flow, 'published_version_id', None) + if published_version_id: + ver_result = await db.execute(select(FlowVersion).where(FlowVersion.id == published_version_id)) published = ver_result.scalar_one_or_none() if published and published.definition_json: import json diff --git a/backend/modules/flow_engine/engine.py b/backend/modules/flow_engine/engine.py index 49a9c51..2106f25 100644 --- a/backend/modules/flow_engine/engine.py +++ b/backend/modules/flow_engine/engine.py @@ -103,7 +103,7 @@ class FlowEngine: current_msg = Msg(name="user", content=f"{enriched_content}\n\n---\n{user_text}", role="user") try: - result = await agent.reply(current_msg) + result = await agent.reply(current_msg, context=context) exec_record = { "node_id": node_id, "node_type": node_type, @@ -245,6 +245,7 @@ async def _create_node_agent(node: dict, context: dict) -> AgentBase: system_prompt = config.get("system_prompt", "你是AI助手。") max_tokens = config.get("max_tokens", 2000) stream = config.get("stream", True) + stream_cb = context.get("_stream_callback") agent = LLMNodeAgent( node_id=node_id, system_prompt=system_prompt, @@ -252,6 +253,7 @@ async def _create_node_agent(node: dict, context: dict) -> AgentBase: temperature=temperature, max_tokens=max_tokens, stream=stream, + stream_callback=stream_cb, ) memory = context.get("_memory") if memory: @@ -329,7 +331,7 @@ class PassThroughAgent(AgentBase): class LLMNodeAgent(AgentBase): - def __init__(self, node_id: str, system_prompt: str, model_name: str = "", temperature: float = 0.7, max_tokens: int = 2000, stream: bool = True): + def __init__(self, node_id: str, system_prompt: str, model_name: str = "", temperature: float = 0.7, max_tokens: int = 2000, stream: bool = True, stream_callback=None): super().__init__() self.name = f"LLM_{node_id}" self.system_prompt = system_prompt @@ -337,52 +339,126 @@ class LLMNodeAgent(AgentBase): self.temperature = temperature self.max_tokens = max_tokens self.stream = stream + self.stream_callback = stream_callback self._memory = None def set_memory(self, memory): self._memory = memory async def reply(self, msg: Msg, **kwargs) -> Msg: - from agentscope_integration.factory import AgentFactory - from agentscope.formatter import OpenAIChatFormatter - - model = AgentFactory._get_model() user_text = msg.get_text_content() if hasattr(msg, 'get_text_content') else str(msg) - - formatter = OpenAIChatFormatter() - messages = [Msg("system", self.system_prompt, "system")] - - if self._memory: + context = kwargs.get("context", {}) + memory_ctx = context.get("_memory_context", {}) + + messages = [{"role": "system", "content": self.system_prompt}] + + if memory_ctx: + summary = memory_ctx.get("summary", "") + recent = memory_ctx.get("recent_messages", []) + if summary: + messages.append({"role": "system", "content": f"[历史对话摘要]\n{summary}"}) + for m in recent[-10:]: + role = m.get("role", "user") + content = m.get("content", "") + if len(content) > 2000: + content = content[:2000] + messages.append({"role": role, "content": content}) + elif self._memory: history = self._memory.get_history(limit=5) for h in history: role = h.get("role", "user") content = h.get("content", "") if len(content) > 2000: content = content[:2000] - messages.append(Msg(role, content, role)) + messages.append({"role": role, "content": content}) - messages.append(Msg("user", user_text, "user")) - prompt = formatter.format(messages) + messages.append({"role": "user", "content": user_text}) try: - res = await model(prompt) - res_text = "" - if isinstance(res, list): - res_text = res[0].get_text_content() if hasattr(res[0], 'get_text_content') else str(res[0]) - elif hasattr(res, 'get_text_content'): - res_text = res.get_text_content() + if self.stream_callback: + res_text = await self._stream_llm_call(messages) else: - res_text = str(res) + res_text = await self._blocking_llm_call(messages) except Exception as e: logger.warning(f"LLM 调用失败: {e}") res_text = f"[LLM 调用失败] 已接收输入: {user_text[:200]}" - if self._memory: + if self._memory and not memory_ctx: self._memory.add("user", user_text) self._memory.add("assistant", res_text) return Msg(self.name, res_text, "assistant") + async def _blocking_llm_call(self, messages: list[dict]) -> str: + from agentscope_integration.factory import AgentFactory + from agentscope.formatter import OpenAIChatFormatter + + model = AgentFactory._get_model() + formatter = OpenAIChatFormatter() + scope_msgs = [] + for m in messages: + scope_msgs.append(Msg(m["role"], m["content"], m["role"])) + prompt = formatter.format(scope_msgs) + + res = await model(prompt) + if isinstance(res, list): + return res[0].get_text_content() if hasattr(res[0], 'get_text_content') else str(res[0]) + elif hasattr(res, 'get_text_content'): + return res.get_text_content() + return str(res) + + async def _stream_llm_call(self, messages: list[dict]) -> str: + import httpx + import json + + api_base = settings.LLM_API_BASE.rstrip("/") + api_key = settings.LLM_API_KEY + model_name = self.model_name or settings.LLM_MODEL + + url = f"{api_base}/chat/completions" + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + body = { + "model": model_name, + "messages": messages, + "temperature": self.temperature, + "max_tokens": self.max_tokens, + "stream": True, + } + + accumulated = "" + try: + timeout = httpx.Timeout(60.0, connect=10.0) + async with httpx.AsyncClient(timeout=timeout) as client: + async with client.stream("POST", url, json=body, headers=headers) as response: + async for line in response.aiter_lines(): + if not line.startswith("data: "): + continue + json_str = line[6:].strip() + if json_str == "[DONE]": + break + try: + chunk = json.loads(json_str) + delta = chunk.get("choices", [{}])[0].get("delta", {}) + token = delta.get("content", "") + if token: + accumulated += token + await self.stream_callback("text_chunk", {"content": token}) + except json.JSONDecodeError: + continue + except httpx.TimeoutException: + logger.warning("LLM 流式调用超时") + if not accumulated: + accumulated = "[LLM 超时]" + except Exception as e: + logger.warning(f"LLM 流式调用失败: {e}") + if not accumulated: + accumulated = f"[LLM 调用失败: {e}]" + + return accumulated + async def observe(self, msg) -> None: pass diff --git a/backend/modules/flow_engine/router.py b/backend/modules/flow_engine/router.py index de8694f..c45956e 100644 --- a/backend/modules/flow_engine/router.py +++ b/backend/modules/flow_engine/router.py @@ -1,6 +1,7 @@ import uuid import time import json +import asyncio import hashlib import secrets from datetime import datetime @@ -30,9 +31,9 @@ def _build_flow_out(f) -> FlowDefinitionOut: id=f.id, name=f.name, description=f.description, version=f.version, status=f.status, definition_json=f.definition_json, - published_version_id=f.published_version_id, - published_to_wecom=f.published_to_wecom, - published_to_web=f.published_to_web, + published_version_id=getattr(f, 'published_version_id', None), + published_to_wecom=getattr(f, 'published_to_wecom', False), + published_to_web=getattr(f, 'published_to_web', False), created_at=f.created_at, updated_at=f.updated_at, ) @@ -226,8 +227,9 @@ def _get_definition_json(flow: FlowDefinition, db_session) -> dict: async def _get_published_definition(flow: FlowDefinition, db: AsyncSession) -> dict: - if flow.published_version_id: - result = await db.execute(select(FlowVersion).where(FlowVersion.id == flow.published_version_id)) + published_version_id = getattr(flow, 'published_version_id', None) + if published_version_id: + result = await db.execute(select(FlowVersion).where(FlowVersion.id == published_version_id)) published = result.scalar_one_or_none() if published: return json.loads(json.dumps(published.definition_json)) @@ -242,6 +244,7 @@ async def execute_flow(flow_id: uuid.UUID, request: Request, payload: dict, db: user_ctx = request.state.user input_text = payload.get("input", payload.get("message", "")) + session_id = payload.get("session_id") or str(uuid.uuid4()) definition = await _get_published_definition(f, db) await ToolNodeAgent.load_custom_tools(db) @@ -250,16 +253,43 @@ async def execute_flow(flow_id: uuid.UUID, request: Request, payload: dict, db: context = { "user_id": user_ctx["id"], "username": user_ctx.get("username", ""), + "session_id": session_id, "trigger_data": payload.get("trigger", {}), "_node_results": {}, } + try: + from modules.memory.manager import get_memory_manager + mm = get_memory_manager() + await mm.inject_memory( + user_id=user_ctx["id"], + flow_id=str(flow_id), + session_id=session_id, + context=context, + ) + except Exception as e: + logger.debug(f"记忆注入跳过: {e}") + start_time = time.time() try: result_msg = await engine.execute(input_msg, context) elapsed_ms = int((time.time() - start_time) * 1000) output_text = result_msg.get_text_content() if hasattr(result_msg, 'get_text_content') else str(result_msg) + try: + from modules.memory.manager import get_memory_manager + mm = get_memory_manager() + asyncio.create_task(mm.record_exchange( + user_id=user_ctx["id"], + flow_id=str(flow_id), + session_id=session_id, + user_msg=input_text, + assistant_msg=output_text, + flow_name=f.name, + )) + except Exception as e: + logger.debug(f"记忆记录跳过: {e}") + execution = FlowExecution( flow_id=f.id, version=_get_published_version_number(f), @@ -276,6 +306,7 @@ async def execute_flow(flow_id: uuid.UUID, request: Request, payload: dict, db: "code": 200, "data": { "output": output_text, + "session_id": session_id, "node_results": context.get("_node_results", {}), "execution_id": str(execution.id), "latency_ms": elapsed_ms, @@ -336,6 +367,7 @@ async def test_flow(flow_id: uuid.UUID, request: Request, db: AsyncSession = Dep async def execute_flow_stream(flow_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): body = await request.json() input_text = body.get("input", body.get("message", "")) + session_id = body.get("session_id") or str(uuid.uuid4()) user_ctx = request.state.user f = await db.get(FlowDefinition, flow_id) @@ -345,63 +377,87 @@ async def execute_flow_stream(flow_id: uuid.UUID, request: Request, db: AsyncSes definition = await _get_published_definition(f, db) await ToolNodeAgent.load_custom_tools(db) + try: + from modules.memory.manager import get_memory_manager + mm_stream = get_memory_manager() + except Exception: + mm_stream = None + async def event_generator(): - import asyncio engine = FlowEngine(definition) context = { "user_id": user_ctx["id"], "username": user_ctx.get("username", ""), + "session_id": session_id, "trigger_data": body.get("trigger", {}), "_node_results": {}, "_stream_callback": None, } + + if mm_stream: + try: + await mm_stream.inject_memory( + user_id=user_ctx["id"], + flow_id=str(flow_id), + session_id=session_id, + context=context, + ) + except Exception: + pass + input_msg = Msg(name="user", content=input_text, role="user") start_time = time.time() - # bind stream callback - stream_chunks = [] + token_queue: asyncio.Queue = asyncio.Queue() async def stream_callback(event_type: str, data: dict): - chunk_data = json.dumps({"event": event_type, "data": data}, ensure_ascii=False) - stream_chunks.append(f"data: {chunk_data}\n\n") + await token_queue.put((event_type, data)) context["_stream_callback"] = stream_callback - try: - yield f"data: {json.dumps({'event': 'workflow_started', 'data': {'flow_id': str(flow_id)}}, ensure_ascii=False)}\n\n" - - # get execution order - graph = engine._build_graph() - start = engine._find_start_nodes(graph) - if start: - yield f"data: {json.dumps({'event': 'node_started', 'data': {'node_id': start[0], 'node_type': definition.get('nodes', [{}])[0].get('type', 'unknown'), 'label': definition.get('nodes', [{}])[0].get('label', '开始')}}, ensure_ascii=False)}\n\n" + yield f"data: {json.dumps({'event': 'workflow_started', 'data': {'flow_id': str(flow_id)}}, ensure_ascii=False)}\n\n" - result_msg = await asyncio.wait_for( + execution_task = asyncio.create_task( + asyncio.wait_for( engine.execute(input_msg, context), timeout=engine.FLOW_TIMEOUT_SECONDS, ) - output_text = result_msg.get_text_content() if hasattr(result_msg, 'get_text_content') else str(result_msg) - elapsed_ms = int((time.time() - start_time) * 1000) + ) - yield f"data: {json.dumps({'event': 'text_chunk', 'data': {'content': output_text}}, ensure_ascii=False)}\n\n" + result_msg = None + exec_error = None + while True: + done = asyncio.ensure_future(token_queue.get()) + try: + wait_done, _ = await asyncio.wait( + [asyncio.ensure_future(execution_task), done], + return_when=asyncio.FIRST_COMPLETED, + ) + except Exception as e: + exec_error = e + break + + if execution_task.done(): + try: + result_msg = execution_task.result() + except asyncio.TimeoutError: + exec_error = asyncio.TimeoutError("执行超时") + except Exception as e: + exec_error = e + while not token_queue.empty(): + ev_type, ev_data = token_queue.get_nowait() + chunk_data = json.dumps({"event": ev_type, "data": ev_data}, ensure_ascii=False) + yield f"data: {chunk_data}\n\n" + break + + ev_type, ev_data = done.result() + chunk_data = json.dumps({"event": ev_type, "data": ev_data}, ensure_ascii=False) + yield f"data: {chunk_data}\n\n" - yield f"data: {json.dumps({'event': 'workflow_finished', 'data': {'output': output_text, 'node_results': {k: str(v)[:200] for k, v in context.get('_node_results', {}).items()}, 'latency_ms': elapsed_ms}}, ensure_ascii=False)}\n\n" + elapsed_ms = int((time.time() - start_time) * 1000) - execution = FlowExecution( - flow_id=f.id, - version=_get_published_version_number(f), - trigger_type=body.get("trigger_type", "manual"), - trigger_user_id=uuid.UUID(user_ctx["id"]), - input_data={"input": input_text}, - output_data={"output": output_text}, - status="completed", - latency_ms=elapsed_ms, - finished_at=datetime.utcnow(), - ) - db.add(execution) - except asyncio.TimeoutError: - elapsed_ms = int((time.time() - start_time) * 1000) - yield f"data: {json.dumps({'event': 'error', 'data': {'message': '执行超时'}}, ensure_ascii=False)}\n\n" + if exec_error: + yield f"data: {json.dumps({'event': 'error', 'data': {'message': str(exec_error)}}, ensure_ascii=False)}\n\n" execution = FlowExecution( flow_id=f.id, version=_get_published_version_number(f), @@ -410,25 +466,41 @@ async def execute_flow_stream(flow_id: uuid.UUID, request: Request, db: AsyncSes input_data={"input": input_text}, status="failed", latency_ms=elapsed_ms, - error_message="执行超时", - finished_at=datetime.utcnow(), - ) - db.add(execution) - except Exception as e: - elapsed_ms = int((time.time() - start_time) * 1000) - yield f"data: {json.dumps({'event': 'error', 'data': {'message': str(e)}}, ensure_ascii=False)}\n\n" - execution = FlowExecution( - flow_id=f.id, - version=_get_published_version_number(f), - trigger_type="manual", - trigger_user_id=uuid.UUID(user_ctx["id"]), - input_data={"input": input_text}, - status="failed", - latency_ms=elapsed_ms, - error_message=str(e)[:2000], + error_message=str(exec_error)[:2000] if not isinstance(exec_error, asyncio.TimeoutError) else "执行超时", finished_at=datetime.utcnow(), ) db.add(execution) + return + + output_text = result_msg.get_text_content() if hasattr(result_msg, 'get_text_content') else str(result_msg) + + yield f"data: {json.dumps({'event': 'workflow_finished', 'data': {'output': output_text, 'session_id': session_id, 'node_results': {k: str(v)[:200] for k, v in context.get('_node_results', {}).items()}, 'latency_ms': elapsed_ms}}, ensure_ascii=False)}\n\n" + + if mm_stream: + try: + asyncio.create_task(mm_stream.record_exchange( + user_id=user_ctx["id"], + flow_id=str(flow_id), + session_id=session_id, + user_msg=input_text, + assistant_msg=output_text, + flow_name=f.name, + )) + except Exception: + pass + + execution = FlowExecution( + flow_id=f.id, + version=_get_published_version_number(f), + trigger_type=body.get("trigger_type", "manual"), + trigger_user_id=uuid.UUID(user_ctx["id"]), + input_data={"input": input_text}, + output_data={"output": output_text}, + status="completed", + latency_ms=elapsed_ms, + finished_at=datetime.utcnow(), + ) + db.add(execution) finally: yield "data: [DONE]\n\n" diff --git a/backend/modules/memory/__init__.py b/backend/modules/memory/__init__.py new file mode 100644 index 0000000..c0d3a31 --- /dev/null +++ b/backend/modules/memory/__init__.py @@ -0,0 +1,4 @@ +from .manager import MemoryManager, get_memory_manager +from .router import router + +__all__ = ["MemoryManager", "get_memory_manager", "router"] \ No newline at end of file diff --git a/backend/modules/memory/manager.py b/backend/modules/memory/manager.py new file mode 100644 index 0000000..7ae0298 --- /dev/null +++ b/backend/modules/memory/manager.py @@ -0,0 +1,201 @@ +import json +import asyncio +import logging +from datetime import datetime +from redis.asyncio import Redis +from config import settings + +logger = logging.getLogger(__name__) + +_memory_manager: "MemoryManager | None" = None + + +def get_memory_manager() -> "MemoryManager": + global _memory_manager + if _memory_manager is None: + raise RuntimeError("MemoryManager 未初始化,请先调用 init_memory_manager()") + return _memory_manager + + +async def init_memory_manager(): + global _memory_manager + redis = Redis.from_url(settings.REDIS_URL, decode_responses=True) + await redis.ping() + _memory_manager = MemoryManager(redis) + + +class MemoryManager: + KEY_PREFIX = "mem" + DEFAULT_TTL = 604800 + SESSION_INDEX_TTL = 2592000 + MAX_HISTORY = 40 + + def __init__(self, redis: Redis): + self.redis = redis + + async def inject_memory( + self, + user_id: str, + flow_id: str, + session_id: str, + context: dict, + ): + messages = await self._get_recent_messages(user_id, flow_id, session_id) + summary = await self._get_summary(user_id, flow_id, session_id) + + context["_memory_context"] = { + "recent_messages": list(reversed(messages)), + "summary": summary, + "session_id": session_id, + } + + async def record_exchange( + self, + user_id: str, + flow_id: str, + session_id: str, + user_msg: str, + assistant_msg: str, + flow_name: str = "", + ): + key = self._msg_key(user_id, flow_id, session_id) + ts = datetime.utcnow().isoformat() + + try: + async with self.redis.pipeline() as pipe: + pipe.lpush(key, + json.dumps({"role": "assistant", "content": assistant_msg, "ts": ts}, ensure_ascii=False), + json.dumps({"role": "user", "content": user_msg, "ts": ts}, ensure_ascii=False), + ) + pipe.ltrim(key, 0, self.MAX_HISTORY - 1) + pipe.expire(key, self.DEFAULT_TTL) + + pipe.hset(self._meta_key(user_id, flow_id, session_id), mapping={ + "flow_name": flow_name, + "last_active_at": ts, + }) + pipe.expire(self._meta_key(user_id, flow_id, session_id), self.DEFAULT_TTL) + + pipe.sadd(f"{self.KEY_PREFIX}:{user_id}:sessions", session_id) + pipe.expire(f"{self.KEY_PREFIX}:{user_id}:sessions", self.SESSION_INDEX_TTL) + + await pipe.execute() + except Exception as e: + logger.warning(f"记录记忆失败: {e}") + + asyncio.create_task(self._maybe_summarize(user_id, flow_id, session_id)) + + async def get_conversation_history( + self, user_id: str, flow_id: str, session_id: str, limit: int = 20 + ) -> list[dict]: + messages = await self._get_recent_messages(user_id, flow_id, session_id, limit) + return list(reversed(messages)) + + async def delete_session(self, user_id: str, session_id: str): + try: + patterns = await self.redis.keys(f"{self.KEY_PREFIX}:{user_id}:*:{session_id}:*") + async with self.redis.pipeline() as pipe: + if patterns: + pipe.delete(*patterns) + pipe.srem(f"{self.KEY_PREFIX}:{user_id}:sessions", session_id) + await pipe.execute() + except Exception as e: + logger.warning(f"清除记忆失败: {e}") + + async def list_user_sessions(self, user_id: str) -> list[dict]: + try: + session_ids = await self.redis.smembers(f"{self.KEY_PREFIX}:{user_id}:sessions") + except Exception: + return [] + + sessions = [] + for sid in session_ids: + try: + keys = await self.redis.keys(f"{self.KEY_PREFIX}:{user_id}:*:{sid}:meta") + for k in keys: + meta = await self.redis.hgetall(k) + parts = k.split(":") + flow_id = parts[2] if len(parts) > 2 else "" + sessions.append({ + "session_id": sid, + "flow_id": flow_id, + "flow_name": meta.get("flow_name", ""), + "last_active_at": meta.get("last_active_at", ""), + }) + except Exception: + continue + + return sorted(sessions, key=lambda s: s.get("last_active_at", ""), reverse=True) + + async def _get_recent_messages( + self, user_id: str, flow_id: str, session_id: str, limit: int = None + ) -> list[dict]: + limit = limit or self.MAX_HISTORY + try: + key = self._msg_key(user_id, flow_id, session_id) + raw = await self.redis.lrange(key, 0, limit - 1) + result = [] + for m in raw: + try: + result.append(json.loads(m)) + except json.JSONDecodeError: + continue + return result + except Exception: + return [] + + async def _get_summary(self, user_id: str, flow_id: str, session_id: str) -> str: + try: + key = f"{self.KEY_PREFIX}:{user_id}:{flow_id}:{session_id}:summary" + val = await self.redis.get(key) + return val or "" + except Exception: + return "" + + async def _maybe_summarize(self, user_id: str, flow_id: str, session_id: str): + try: + key = self._msg_key(user_id, flow_id, session_id) + count = await self.redis.llen(key) + if count < 30: + return + + summary_key = f"{self.KEY_PREFIX}:{user_id}:{flow_id}:{session_id}:summary" + existing = await self.redis.get(summary_key) + if existing: + return + + recent = await self._get_recent_messages(user_id, flow_id, session_id, 20) + dialogue = "\n".join( + f"{m['role']}: {m['content'][:500]}" for m in reversed(recent[:10]) + ) + + import httpx + api_base = settings.LLM_API_BASE.rstrip("/") + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.post( + f"{api_base}/chat/completions", + json={ + "model": settings.LLM_MODEL, + "messages": [{ + "role": "user", + "content": f"请用一段话简要总结以下对话的关键内容。保留人名、任务、决策、时间等关键信息。\n\n{dialogue}" + }], + "max_tokens": 200, + }, + headers={"Authorization": f"Bearer {settings.LLM_API_KEY}"}, + ) + data = resp.json() + summary = data.get("choices", [{}])[0].get("message", {}).get("content", "") + + if summary: + await self.redis.setex(summary_key, 2592000, summary) + except Exception: + pass + + @staticmethod + def _msg_key(user_id: str, flow_id: str, session_id: str) -> str: + return f"mem:{user_id}:{flow_id}:{session_id}:messages" + + @staticmethod + def _meta_key(user_id: str, flow_id: str, session_id: str) -> str: + return f"mem:{user_id}:{flow_id}:{session_id}:meta" \ No newline at end of file diff --git a/backend/modules/memory/router.py b/backend/modules/memory/router.py new file mode 100644 index 0000000..3ca3af2 --- /dev/null +++ b/backend/modules/memory/router.py @@ -0,0 +1,30 @@ +from fastapi import APIRouter, Request, Depends, HTTPException +from dependencies import get_current_user +from modules.memory.manager import get_memory_manager + +router = APIRouter(prefix="/api/memory", tags=["记忆管理"]) + + +@router.get("/sessions") +async def list_sessions(request: Request, user=Depends(get_current_user)): + mm = get_memory_manager() + sessions = await mm.list_user_sessions(str(user.id)) + return {"code": 200, "data": sessions} + + +@router.get("/sessions/{session_id}") +async def get_session(session_id: str, flow_id: str = "", request: Request, user=Depends(get_current_user)): + mm = get_memory_manager() + history = await mm.get_conversation_history( + user_id=str(user.id), + flow_id=flow_id, + session_id=session_id, + ) + return {"code": 200, "data": history} + + +@router.delete("/sessions/{session_id}") +async def clear_session(session_id: str, request: Request, user=Depends(get_current_user)): + mm = get_memory_manager() + await mm.delete_session(str(user.id), session_id) + return {"code": 200, "message": "记忆已清除"} \ No newline at end of file diff --git a/backend/modules/memory/schemas.py b/backend/modules/memory/schemas.py new file mode 100644 index 0000000..31e66d4 --- /dev/null +++ b/backend/modules/memory/schemas.py @@ -0,0 +1,19 @@ +from pydantic import BaseModel, ConfigDict +from datetime import datetime + + +class MemorySessionOut(BaseModel): + session_id: str + flow_id: str + flow_name: str + last_active_at: str + + +class ConversationMessage(BaseModel): + role: str + content: str + ts: str = "" + + +class ClearSessionRequest(BaseModel): + session_id: str \ No newline at end of file diff --git a/backend/modules/rag/router.py b/backend/modules/rag/router.py index 4a30212..235916b 100644 --- a/backend/modules/rag/router.py +++ b/backend/modules/rag/router.py @@ -6,7 +6,7 @@ import os import uuid from config import settings -from .knowledge import add_document, add_text, search, retrieve_for_agent +from .knowledge import add_document, add_text, search, retrieve_for_agent, get_knowledge_base router = APIRouter(prefix="/api/rag", tags=["rag"]) @@ -70,4 +70,120 @@ async def rag_retrieve( if not q: return {"code": 400, "message": "查询内容不能为空"} result = await retrieve_for_agent(q, limit=limit) - return {"code": 200, "data": result} \ No newline at end of file + return {"code": 200, "data": result} + + +@router.get("/documents") +async def list_documents( + request: Request, + db: AsyncSession = Depends(get_db), + current_user=Depends(get_current_user), +): + try: + kb = get_knowledge_base() + if not kb or not hasattr(kb, '_embedding_store'): + return {"code": 200, "data": [], "total": 0} + + store = kb._embedding_store + all_docs = [] + try: + if hasattr(store, 'client') and hasattr(store.client, 'get_collection'): + collection = store.client.get_collection(collection_name=store.collection_name) + total = collection.count() + offset = 0 + batch_size = 100 + while offset < total: + result = collection.get(offset=offset, limit=batch_size) + for doc_id, payload, vector in zip(result.ids, result.payloads, result.vectors): + source = (payload or {}).get("source", "") or (payload or {}).get("file_path", "") + content_preview = "" + if isinstance(payload, dict) and "content" in payload: + c = payload["content"] + content_preview = c[:200] if len(c) > 200 else c + all_docs.append({ + "id": doc_id, + "source": source, + "content_preview": content_preview, + "metadata": payload or {}, + }) + offset += batch_size + + seen_sources = {} + for d in all_docs: + src = d["source"] or "unknown" + if src not in seen_sources: + seen_sources[src] = {"source": src, "chunk_count": 0, "first_id": d["id"], "preview": d["content_preview"]} + seen_sources[src]["chunk_count"] += 1 + + return { + "code": 200, + "data": list(seen_sources.values()), + "total_chunks": total, + "total_files": len(seen_sources), + } + except Exception as e: + return {"code": 200, "data": [], "error": str(e), "total": 0} + except Exception as e: + return {"code": 500, "message": f"获取文档列表失败: {e}"} + + +@router.delete("/documents/{source}") +async def delete_document( + source: str, + request: Request, + db: AsyncSession = Depends(get_db), + current_user=Depends(get_current_user), +): + try: + kb = get_knowledge_base() + store = kb._embedding_store + + if hasattr(store, 'client') and hasattr(store.client, 'get_collection'): + collection = store.client.get_collection(collection_name=store.collection_name) + result = collection.get(where={"source": source}) + if result and result.ids: + collection.delete(ids=result.ids) + return {"code": 200, "message": f"已删除 {len(result.ids)} 个文档块", "deleted_count": len(result.ids)} + else: + return {"code": 404, "message": "未找到该来源的文档"} + return {"code": 500, "message": "无法连接向量存储"} + except Exception as e: + return {"code": 500, "message": f"删除文档失败: {e}"} + + +@router.get("/stats") +async def knowledge_stats( + request: Request, + db: AsyncSession = Depends(get_db), + current_user=Depends(get_current_user), +): + try: + kb = get_knowledge_base() + store = kb._embedding_store + stats_data = { + "status": "initialized", + "collection_name": getattr(store, 'collection_name', 'unknown'), + "dimensions": getattr(store, 'dimensions', 0), + "total_chunks": 0, + "total_files": 0, + } + + if hasattr(store, 'client') and hasattr(store.client, 'get_collection'): + try: + collection = store.client.get_collection(collection_name=store.collection_name) + stats_data["total_chunks"] = collection.count() + + result = collection.get(limit=1000) + sources = set() + for p in (result.payloads or []): + if isinstance(p, dict): + s = p.get("source", "") or p.get("file_path", "") + if s: + sources.add(s) + stats_data["total_files"] = len(sources) + except Exception: + stats_data["status"] = "collection_error" + + return {"code": 200, "data": stats_data} + except Exception as e: + return {"code": 500, "message": f"获取统计信息失败: {e}"} \ No newline at end of file diff --git a/frontend/src/api/index.ts b/frontend/src/api/index.ts index c3ed589..f53f3ef 100644 --- a/frontend/src/api/index.ts +++ b/frontend/src/api/index.ts @@ -188,4 +188,7 @@ export const ragApi = { indexText: (data: { text: string; source?: string }) => api.post('/rag/index-text', data), search: (q: string, limit?: number) => api.get('/rag/search', { params: { q, limit } }), retrieve: (q: string, limit?: number) => api.get('/rag/retrieve', { params: { q, limit } }), + getDocuments: () => api.get('/rag/documents'), + deleteDocument: (source: string) => api.delete(`/rag/documents/${encodeURIComponent(source)}`), + getStats: () => api.get('/rag/stats'), } \ No newline at end of file diff --git a/frontend/src/components/layout/AdminLayout.vue b/frontend/src/components/layout/AdminLayout.vue index 43f6171..bcd5c39 100644 --- a/frontend/src/components/layout/AdminLayout.vue +++ b/frontend/src/components/layout/AdminLayout.vue @@ -30,7 +30,7 @@ 角色列表 @@ -38,48 +38,42 @@ 流列表 流编辑器 流市场 + 自定义API工具 + MCP服务管理 - + - 服务管理 - - - - 知识库管理 + 智能体管理 + 企微机器人配置 - + 创建任务 + 员工监控 - - - 审计日志 - - - + - 系统监控 + 审计日志 + 系统监控 + 系统设置 @@ -121,7 +115,7 @@ import { ref, computed } from 'vue' import { useRoute, useRouter } from 'vue-router' import { useUserStore } from '@/stores/user' -import { Fold, User, ArrowDown, List, Connection, Search } from '@element-plus/icons-vue' +import { Fold, User, ArrowDown, Monitor, OfficeBuilding, Lock, Share, Cpu, TrendCharts, Tools } from '@element-plus/icons-vue' import PortalSwitcher from '@/components/common/PortalSwitcher.vue' const route = useRoute() @@ -134,8 +128,16 @@ const activeMenu = computed(() => { if (path.startsWith('/admin/org')) return path if (path.startsWith('/admin/role')) return path if (path.startsWith('/admin/flow')) return path + if (path.startsWith('/admin/tools')) return path + if (path.startsWith('/admin/mcp')) return path + if (path.startsWith('/admin/rag')) return path + if (path.startsWith('/admin/agent')) return path + if (path.startsWith('/admin/wecom')) return path + if (path.startsWith('/admin/task')) return path + if (path.startsWith('/admin/monitor')) return path if (path.startsWith('/admin/audit')) return path if (path.startsWith('/admin/system')) return path + if (path.startsWith('/admin/settings')) return path return path }) diff --git a/frontend/src/components/layout/MainLayout.vue b/frontend/src/components/layout/MainLayout.vue index 5297a92..83cd1d0 100644 --- a/frontend/src/components/layout/MainLayout.vue +++ b/frontend/src/components/layout/MainLayout.vue @@ -18,71 +18,28 @@ 工作台 - + - 员工列表 + 流式对话 + 文档处理 - + - 任务列表 + 任务列表 + 通知中心 - - - 智能体 - - - - - 文档管理 - - - - - 知识库 - - - - - 企微配置 - - - - - 通知中心 - - - - - 流式对话 - - - - - 流程管理 - - - - - 自定义工具 - - 个人中心 - - - - 系统配置 - @@ -123,7 +80,7 @@ import { ref, computed } from 'vue' import { useRoute, useRouter } from 'vue-router' import { useUserStore } from '@/stores/user' -import { Fold, User, ArrowDown, Tools, Search, Promotion, ChatLineSquare, SetUp } from '@element-plus/icons-vue' +import { Fold, User, ArrowDown, Cpu, Monitor } from '@element-plus/icons-vue' import PortalSwitcher from '@/components/common/PortalSwitcher.vue' const route = useRoute() @@ -133,16 +90,11 @@ const isCollapse = ref(false) const activeMenu = computed(() => { const path = route.path - if (path.startsWith('/user/monitor')) return '/user/monitor/employees' - if (path.startsWith('/user/task')) return '/user/task/list' - if (path.startsWith('/user/agent')) return '/user/agent/list' + if (path.startsWith('/user/chat')) return '/user/chat/flow' if (path.startsWith('/user/document')) return '/user/document/manager' - if (path.startsWith('/user/flow')) return '/user/flow/list' - if (path.startsWith('/user/wecom')) return '/user/wecom/config' + if (path.startsWith('/user/task')) return '/user/task/list' if (path.startsWith('/user/notification')) return '/user/notification/center' - if (path.startsWith('/user/chat')) return '/user/chat/flow' - if (path.startsWith('/user/tools')) return '/user/tools/custom' - if (path.startsWith('/user/settings')) return '/user/settings' + if (path.startsWith('/user/profile')) return '/user/profile' return path }) @@ -151,7 +103,9 @@ function can(code: string): boolean { } function handleCommand(cmd: string) { - if (cmd === 'logout') { + if (cmd === 'profile') { + router.push('/user/profile') + } else if (cmd === 'logout') { userStore.logout() router.push('/login') } diff --git a/frontend/src/router/index.ts b/frontend/src/router/index.ts index 7946dee..0177566 100644 --- a/frontend/src/router/index.ts +++ b/frontend/src/router/index.ts @@ -29,34 +29,10 @@ const router = createRouter({ meta: { title: '工作台' }, }, { - path: 'monitor/employees', - name: 'MonitorEmployees', - component: () => import('@/views/monitor/EmployeeList.vue'), - meta: { title: '员工监控', perms: ['monitor:read'] }, - }, - { - path: 'monitor/:id/dashboard', - name: 'MonitorDashboard', - component: () => import('@/views/monitor/WorkDashboard.vue'), - meta: { title: '工作看板', perms: ['monitor:read'] }, - }, - { - path: 'monitor/:id/analysis', - name: 'MonitorAnalysis', - component: () => import('@/views/monitor/AIAnalysis.vue'), - meta: { title: 'AI分析', perms: ['monitor:read'] }, - }, - { - path: 'task/list', - name: 'TaskList', - component: () => import('@/views/task/TaskList.vue'), - meta: { title: '任务列表', perms: ['task:read'] }, - }, - { - path: 'task/:id', - name: 'TaskDetail', - component: () => import('@/views/task/TaskDetail.vue'), - meta: { title: '任务详情', perms: ['task:read'] }, + path: 'chat/flow', + name: 'FlowChat', + component: () => import('@/views/chat/FlowChat.vue'), + meta: { title: '流式对话' }, }, { path: 'agent/list', @@ -77,16 +53,16 @@ const router = createRouter({ meta: { title: '文档管理' }, }, { - path: 'rag/knowledge', - name: 'KnowledgeBase', - component: () => import('@/views/rag/KnowledgeBase.vue'), - meta: { title: '知识库' }, + path: 'task/list', + name: 'TaskList', + component: () => import('@/views/task/TaskList.vue'), + meta: { title: '任务列表', perms: ['task:read'] }, }, { - path: 'wecom/config', - name: 'WecomConfig', - component: () => import('@/views/wecom/BotConfig.vue'), - meta: { title: '企微配置' }, + path: 'task/:id', + name: 'TaskDetail', + component: () => import('@/views/task/TaskDetail.vue'), + meta: { title: '任务详情', perms: ['task:read'] }, }, { path: 'notification/center', @@ -94,48 +70,12 @@ const router = createRouter({ component: () => import('@/views/notification/NotificationCenter.vue'), meta: { title: '通知中心' }, }, - { - path: 'flow/list', - name: 'UserFlowList', - component: () => import('@/views/flow/FlowList.vue'), - meta: { title: '流程管理', perms: ['flow:read'] }, - }, - { - path: 'flow/editor', - name: 'UserFlowEditor', - component: () => import('@/views/flow/FlowEditor.vue'), - meta: { title: '流编辑器', perms: ['flow:create'] }, - }, - { - path: 'flow/editor/:id', - name: 'UserFlowEditorEdit', - component: () => import('@/views/flow/FlowEditor.vue'), - meta: { title: '编辑流', perms: ['flow:update'] }, - }, { path: 'profile', name: 'Profile', component: () => import('@/views/profile/Profile.vue'), meta: { title: '个人中心' }, }, - { - path: 'chat/flow', - name: 'FlowChat', - component: () => import('@/views/chat/FlowChat.vue'), - meta: { title: '流式对话' }, - }, - { - path: 'tools/custom', - name: 'CustomToolManager', - component: () => import('@/views/tools/CustomToolManager.vue'), - meta: { title: '自定义API工具' }, - }, - { - path: 'settings', - name: 'Settings', - component: () => import('@/views/settings/Settings.vue'), - meta: { title: '系统配置' }, - }, ], }, { @@ -203,6 +143,12 @@ const router = createRouter({ component: () => import('@/views/flow/FlowDetail.vue'), meta: { title: '流详情', perms: ['flow:read'] }, }, + { + path: 'tools/custom', + name: 'AdminCustomToolManager', + component: () => import('@/views/tools/CustomToolManager.vue'), + meta: { title: '自定义API工具', perms: ['flow:create'] }, + }, { path: 'mcp/manager', name: 'AdminMcpManager', @@ -215,6 +161,42 @@ const router = createRouter({ component: () => import('@/views/rag/KnowledgeBase.vue'), meta: { title: '知识库管理', perms: ['flow:create'] }, }, + { + path: 'wecom/config', + name: 'AdminWecomConfig', + component: () => import('@/views/wecom/BotConfig.vue'), + meta: { title: '企微机器人配置', perms: ['admin:access'] }, + }, + { + path: 'agent/list', + name: 'AdminAgentList', + component: () => import('@/views/agent/AgentList.vue'), + meta: { title: '智能体管理', perms: ['admin:access'] }, + }, + { + path: 'agent/chat/:type', + name: 'AdminAgentChat', + component: () => import('@/views/agent/AgentChat.vue'), + meta: { title: '智能体对话', perms: ['admin:access'] }, + }, + { + path: 'monitor/employees', + name: 'AdminMonitorEmployees', + component: () => import('@/views/monitor/EmployeeList.vue'), + meta: { title: '员工监控', perms: ['monitor:read'] }, + }, + { + path: 'monitor/:id/dashboard', + name: 'AdminMonitorDashboard', + component: () => import('@/views/monitor/WorkDashboard.vue'), + meta: { title: '工作看板', perms: ['monitor:read'] }, + }, + { + path: 'monitor/:id/analysis', + name: 'AdminMonitorAnalysis', + component: () => import('@/views/monitor/AIAnalysis.vue'), + meta: { title: 'AI分析', perms: ['monitor:read'] }, + }, { path: 'task/create', name: 'AdminTaskCreate', @@ -233,6 +215,12 @@ const router = createRouter({ component: () => import('@/views/system/SystemMonitor.vue'), meta: { title: '系统监控', perms: ['audit:read'] }, }, + { + path: 'settings', + name: 'AdminSettings', + component: () => import('@/views/settings/Settings.vue'), + meta: { title: '系统设置', perms: ['audit:read'] }, + }, ], }, ], diff --git a/frontend/src/views/chat/FlowChat.vue b/frontend/src/views/chat/FlowChat.vue index b793a00..120c6d0 100644 --- a/frontend/src/views/chat/FlowChat.vue +++ b/frontend/src/views/chat/FlowChat.vue @@ -9,15 +9,12 @@ - - - 阻塞模式 - 流式模式 - - 清空对话 + + 清除记忆 + @@ -89,7 +86,6 @@ import { UserFilled, Cpu, Loading, Promotion } from '@element-plus/icons-vue' import api from '@/api' const selectedFlowId = ref('') -const responseMode = ref('streaming') const inputText = ref('') const messages = ref([]) const streaming = ref(false) @@ -127,6 +123,22 @@ function clearMessages() { } } +async function clearMemory() { + const sid = sessionId.value + if (sid) { + try { + await fetch(`/api/memory/sessions/${sid}`, { + method: 'DELETE', + headers: { 'Authorization': `Bearer ${localStorage.getItem('token')}` }, + }) + ElMessage.success('记忆已清除') + } catch { + ElMessage.warning('清除记忆请求失败') + } + } + clearMessages() +} + function scrollToBottom() { nextTick(() => { if (chatContainer.value) { @@ -147,43 +159,10 @@ async function sendMessage() { messages.value.push(userMsg) scrollToBottom() - if (responseMode.value === 'streaming') { - await sendStreaming() - } else { - await sendBlocking() - } + await sendStreaming() inputText.value = '' } -async function sendBlocking() { - sending.value = true - try { - const res = await api.post(`/flow/definitions/${selectedFlowId.value}/execute`, { - input: messages.value[messages.value.length - 1]?.content, - session_id: sessionId.value || undefined, - trigger_type: 'chat', - }) - const data = (res as any)?.data - const output = data?.output || data?.result || JSON.stringify(res) - messages.value.push({ - id: Date.now(), - role: 'assistant', - content: output, - nodeResults: data?.node_results, - time: new Date().toLocaleTimeString('zh-CN'), - }) - if (data?.session_id) { - sessionId.value = data.session_id - localStorage.setItem('flow_chat_session', sessionId.value) - } - } catch (e: any) { - ElMessage.error(e?.response?.data?.detail || '发送失败') - } finally { - sending.value = false - scrollToBottom() - } -} - async function sendStreaming() { streaming.value = true streamBuffer.value = '' @@ -229,15 +208,12 @@ async function sendStreaming() { if (line.startsWith('data: ')) { const jsonStr = line.slice(6).trim() if (!jsonStr) continue + if (jsonStr === '[DONE]') continue try { const event = JSON.parse(jsonStr) - if (event.event === 'workflow_started') { - // 流开始 - } else if (event.event === 'node_started') { - // 节点开始 - } else if (event.event === 'text_chunk') { + if (event.event === 'text_chunk') { streamBuffer.value += event.data?.content || '' scrollToBottom() } else if (event.event === 'node_completed') { diff --git a/frontend/src/views/dashboard/Dashboard.vue b/frontend/src/views/dashboard/Dashboard.vue index 43fdae9..7227d07 100644 --- a/frontend/src/views/dashboard/Dashboard.vue +++ b/frontend/src/views/dashboard/Dashboard.vue @@ -33,7 +33,7 @@
- + 智能体对话 @@ -42,7 +42,7 @@ 编排工作流 - + 查看监控
diff --git a/frontend/src/views/monitor/EmployeeList.vue b/frontend/src/views/monitor/EmployeeList.vue index 968a74e..d6e2e1e 100644 --- a/frontend/src/views/monitor/EmployeeList.vue +++ b/frontend/src/views/monitor/EmployeeList.vue @@ -10,8 +10,8 @@ diff --git a/frontend/src/views/rag/KnowledgeBase.vue b/frontend/src/views/rag/KnowledgeBase.vue index 843f958..cf7a417 100644 --- a/frontend/src/views/rag/KnowledgeBase.vue +++ b/frontend/src/views/rag/KnowledgeBase.vue @@ -6,52 +6,110 @@ - - - + + + - - - + + - + + +
+ 提示:删除操作会移除该来源下的所有向量分块,不可恢复 +
- - 检索 + + + -
+
-
相关度: {{ (r.score * 100).toFixed(1) }}% | 来源: {{ r.source }}
+
+ + 相关度 {{ (r.score * 100).toFixed(1) }}% + + 来源: {{ r.source }} +
{{ r.content }}
- + +
+ +

输入问题后点击检索

+
- - - 索引文本 + + + 索引文本 + + + + + + + + {{ stats.status === 'initialized' ? '正常' : '异常' }} + + + {{ stats.collection_name || '-' }} + {{ stats.dimensions || '-' }} + {{ stats.total_chunks || 0 }} + {{ stats.total_files || 0 }} + @@ -60,7 +118,8 @@ \ No newline at end of file