# PLAN8 — 记忆管理模块 & 用户画像系统 ## 一、需求背景 ### 1.1 核心需求对照(README L11-L16) | # | 需求 | 记忆/画像关联 | |---|------|--------------| | 1 | 用户和AI沟通工作,有现成流程给用户选择使用 | **用户-工作流对话记忆**:用户选了一个流程对话,下次回来继续聊,AI记得之前聊了什么 | | 2 | 上司查看下属工作情况,AI综合评估 | **用户画像+行为轨迹**:积累下属的工作数据,AI才能做综合评估 | | 3 | 上司通过AI指派任务给下属 | **跨用户上下文**:上司和AI说"给小明派个任务",AI需要知道小明是谁 | | 4 | 流程由后台无代码搭建 | **无需在流中加记忆节点**:记忆是通用中间件,对所有流透明生效 | | 5 | MCP服务获取/更新系统数据 | **外部数据注入记忆**:MCP返回的系统数据可选择性注入记忆上下文 | | 6 | 知识库提升模型认知 | **知识+记忆融合检索**:RAG检索结果与历史对话记忆合并排序 | ### 1.2 当前记忆现状与缺陷 ``` FlowSessionMemory (engine.py:15) self._messages: list[dict] = [] ← 纯 Python list 生命周期 = 单次 HTTP 请求 请求结束 → 内存释放 → 记忆全丢 ChatSession / ChatMessage (models/__init__.py:92-113) 已有数据库表!但只被 chat/router.py 基本对话使用 FlowEngine.execute() 完全不碰这两个表 UserIsolatedMemory (agentscope_integration/memory/) 基于 agentscope MemoryBase 但只在 AgentFactory.create_agent() 中为 agent 聊天使用 流执行完全不经过它 断链点: router.py context 里没放 session_id → FlowEngine 每次都生成新 uuid → FlowSessionMemory 永远从空列表开始 → 无任何持久化写回逻辑 ``` --- ## 二、架构设计 ### 2.1 核心设计原则(修改点) 1. **纯 Redis 记忆存储**:对话记忆用 Redis String + List + Hash 组合存储,Redis 开启 `--appendonly yes` AOF 持久化保证不失忆。速度优先,毫秒级读写 2. **透明通用中间件**:记忆不是流中的节点,不需要在流编辑器里配置。对所有已发布流自动生效,是 flow_engine 层面的通用钩子 3. **用户画像存 PostgreSQL**:画像需要复杂聚合查询和结构化字段,用 PG 合适;不要求实时性,每次对话结束异步更新即可 4. **用户-工作流隔离**:记忆以 `(user_id, flow_id, session_id)` 为最小隔离单元 5. **独立模块**:MemoryManager 是独立服务,FlowEngine、AgentChat 统一调用 ### 2.2 中间件触发位置(关键设计) ``` 所有流的执行路径: router.py: execute_flow() / execute_flow_stream() │ ├── ❶ 执行前:MemoryManager.inject_memory(user_id, flow_id, session_id) │ → 从 Redis 检索该用户-流组合的历史对话 │ → 注入 context["_memory_context"] │ ├── FlowEngine.execute() 执行各节点 │ └── LLMNodeAgent.reply() │ ├── 从 context["_memory_context"] 获取历史消息 │ ├── 拼入 messages 数组发给 LLM │ └── 返回结果 │ ├── ❷ 执行后:MemoryManager.record_exchange(user_id, flow_id, session_id, user_msg, assistant_msg) │ → 写入 Redis(异步,不阻塞响应) │ └── 返回响应给前端 ``` **用户无感知**:不需要在流编辑器中加任何东西,所有流自动获得记忆能力。 --- ## 三、Redis 键设计 ### 3.1 键命名空间 ``` # 会话消息列表(LPUSH + LTRIM 控制长度) mem:{user_id}:{flow_id}:{session_id}:messages → Redis List,每条是 JSON: {"role":"user","content":"...","ts":"..."} → LTRIM 控制在 max_history * 2 条(每轮对话 user+assistant 两条) # 会话元数据 mem:{user_id}:{flow_id}:{session_id}:meta → Redis Hash → {flow_name, created_at, last_active_at, message_count} # 用户的会话索引(SMEMBERS 获取该用户所有 session) mem:{user_id}:sessions → Redis Set,存 session_id 列表 # 会话摘要(超过阈值时生成,独立 Key) mem:{user_id}:{flow_id}:{session_id}:summary → Redis String,LLM 生成的摘要文本 # TTL 策略 mem:{user_id}:{flow_id}:{session_id}:messages → 7 天(EXPIRE 604800) mem:{user_id}:{flow_id}:{session_id}:meta → 7 天 mem:{user_id}:sessions → 30 天 mem:{user_id}:{flow_id}:{session_id}:summary → 30 天(摘要保留更久) ``` ### 3.2 内存估算 ``` 单条消息 ≈ 500 bytes(含 JSON 开销) 每轮对话 ≈ 2 条 = 1KB 30 轮/会话 × 1KB = 30KB/会话 100 个活跃用户 × 10 个会话/用户 × 30KB = 30MB Redis AOF 文件预估:初始 30MB,日常增长可控 ``` --- ## 四、核心代码设计 ### 4.1 模块位置 ``` backend/ └── modules/ └── memory/ ← 新建独立模块 ├── __init__.py → 导出 ├── manager.py → MemoryManager 核心类 ├── profile.py → UserProfileEngine(Phase 2) ├── router.py → 管理 API └── schemas.py → Pydantic 模型 ``` ### 4.2 MemoryManager ```python # modules/memory/manager.py class MemoryManager: """记忆管理中心 — 纯 Redis 存储""" KEY_PREFIX = "mem" DEFAULT_TTL = 604800 # 7 天 SESSION_INDEX_TTL = 2592000 # 30 天 MAX_HISTORY = 40 # 每个会话最多保留 40 条消息(20 轮对话) def __init__(self, redis: Redis): self.redis = redis # ===== 核心 API ===== async def inject_memory( self, user_id: str, flow_id: str, session_id: str, context: dict, ): """执行前:将历史记忆注入 context""" messages = await self._get_recent_messages(user_id, flow_id, session_id) summary = await self._get_summary(user_id, flow_id, session_id) context["_memory_context"] = { "recent_messages": list(reversed(messages)), "summary": summary, "session_id": session_id, } async def record_exchange( self, user_id: str, flow_id: str, session_id: str, user_msg: str, assistant_msg: str, flow_name: str = "", ): """执行后:异步记录本轮对话""" key = self._msg_key(user_id, flow_id, session_id) ts = datetime.utcnow().isoformat() async with self.redis.pipeline() as pipe: pipe.lpush(key, json.dumps({"role": "assistant", "content": assistant_msg, "ts": ts}), json.dumps({"role": "user", "content": user_msg, "ts": ts}), ) pipe.ltrim(key, 0, self.MAX_HISTORY - 1) pipe.expire(key, self.DEFAULT_TTL) pipe.hset(self._meta_key(user_id, flow_id, session_id), mapping={ "flow_name": flow_name, "last_active_at": ts, }) pipe.expire(self._meta_key(user_id, flow_id, session_id), self.DEFAULT_TTL) pipe.sadd(f"{self.KEY_PREFIX}:{user_id}:sessions", session_id) pipe.expire(f"{self.KEY_PREFIX}:{user_id}:sessions", self.SESSION_INDEX_TTL) await pipe.execute() # 异步触发摘要检查 asyncio.create_task(self._maybe_summarize(user_id, flow_id, session_id)) async def get_conversation_history( self, user_id: str, flow_id: str, session_id: str, limit: int = 20 ) -> list[dict]: """获取完整历史(管理 API 用)""" messages = await self._get_recent_messages(user_id, flow_id, session_id, limit) return list(reversed(messages)) async def delete_session(self, user_id: str, session_id: str): """清除某个会话的所有记忆""" patterns = await self.redis.keys(f"{self.KEY_PREFIX}:{user_id}:*:{session_id}:*") async with self.redis.pipeline() as pipe: if patterns: pipe.delete(*patterns) pipe.srem(f"{self.KEY_PREFIX}:{user_id}:sessions", session_id) await pipe.execute() async def list_user_sessions(self, user_id: str) -> list[dict]: """列出用户所有会话""" session_ids = await self.redis.smembers(f"{self.KEY_PREFIX}:{user_id}:sessions") sessions = [] for sid in session_ids: keys = await self.redis.keys(f"{self.KEY_PREFIX}:{user_id}:*:{sid}:meta") for k in keys: meta = await self.redis.hgetall(k) parts = k.split(":") flow_id = parts[2] if len(parts) > 2 else "" sessions.append({ "session_id": sid, "flow_id": flow_id, "flow_name": meta.get("flow_name", ""), "last_active_at": meta.get("last_active_at", ""), }) return sorted(sessions, key=lambda s: s["last_active_at"], reverse=True) # ===== 内部方法 ===== async def _get_recent_messages( self, user_id: str, flow_id: str, session_id: str, limit: int = None ) -> list[dict]: limit = limit or self.MAX_HISTORY key = self._msg_key(user_id, flow_id, session_id) raw = await self.redis.lrange(key, 0, limit - 1) return [json.loads(m) for m in raw] async def _get_summary(self, user_id: str, flow_id: str, session_id: str) -> str: key = f"{self.KEY_PREFIX}:{user_id}:{flow_id}:{session_id}:summary" val = await self.redis.get(key) return val or "" async def _maybe_summarize(self, user_id: str, flow_id: str, session_id: str): """消息超过阈值时异步生成摘要""" key = self._msg_key(user_id, flow_id, session_id) count = await self.redis.llen(key) if count < 30: # 15 轮以上才触发摘要 return recent = await self._get_recent_messages(user_id, flow_id, session_id, 20) dialogue = "\n".join(f"{m['role']}: {m['content'][:500]}" for m in recent[:10]) try: import httpx api_base = settings.LLM_API_BASE.rstrip("/") resp = await httpx.AsyncClient(timeout=30).post( f"{api_base}/chat/completions", json={ "model": settings.LLM_MODEL, "messages": [{ "role": "user", "content": f"请用一段话简要总结以下对话的关键内容。保留人名、任务、决策、时间等关键信息。\n\n{dialogue}" }], "max_tokens": 200, }, headers={"Authorization": f"Bearer {settings.LLM_API_KEY}"}, ) data = resp.json() summary = data.get("choices", [{}])[0].get("message", {}).get("content", "") if summary: key = f"{self.KEY_PREFIX}:{user_id}:{flow_id}:{session_id}:summary" await self.redis.setex(key, 2592000, summary) # 30 天 except Exception: pass # 摘要失败不影响主流程 @staticmethod def _msg_key(user_id, flow_id, session_id): return f"mem:{user_id}:{flow_id}:{session_id}:messages" @staticmethod def _meta_key(user_id, flow_id, session_id): return f"mem:{user_id}:{flow_id}:{session_id}:meta" ``` ### 4.3 LLMNodeAgent 透明集成 ```python # engine.py LLMNodeAgent.reply() 的变更 async def reply(self, msg: Msg, **kwargs) -> Msg: user_text = msg.get_text_content() context = kwargs.get("context", {}) memory_ctx = context.get("_memory_context", {}) messages = [{"role": "system", "content": self.system_prompt}] # ★ 透明注入记忆(所有流自动生效,无需配置) if memory_ctx: summary = memory_ctx.get("summary", "") recent = memory_ctx.get("recent_messages", []) if summary: messages.append({"role": "system", "content": f"[历史对话摘要]\n{summary}"}) for m in recent[-10:]: # 最近 10 条 messages.append({"role": m["role"], "content": m["content"]}) messages.append({"role": "user", "content": user_text}) # 流式/阻塞调用 LLM(已有逻辑) ... return Msg(self.name, res_text, "assistant") ``` ### 4.4 flow_engine/router.py 集成 ```python # 统一在 execute_flow() 和 execute_flow_stream() 中调用 memory_manager = get_memory_manager() context = { "user_id": user_ctx["id"], "username": user_ctx.get("username", ""), "session_id": session_id, # ← 从请求 body 传入 ... } # ★ 执行前:注入记忆 await memory_manager.inject_memory( user_id=user_ctx["id"], flow_id=str(flow_id), session_id=session_id, context=context, ) engine = FlowEngine(definition) result = await engine.execute(input_msg, context) output = result.get_text_content() # ★ 执行后:记录本轮对话 asyncio.create_task(memory_manager.record_exchange( user_id=user_ctx["id"], flow_id=str(flow_id), session_id=session_id, user_msg=input_text, assistant_msg=output, flow_name=f.name, )) return {"code": 200, "data": {"output": output, "session_id": session_id}} ``` --- ## 五、数据库设计(仅用户画像) ### 5.1 `user_profiles` 表(PostgreSQL) ```python class UserProfile(Base): __tablename__ = "user_profiles" user_id = UUID, FK→users.id, PK display_name = String(100) department_name = String(100) position = String(100) role_tags = JSON, default=[] total_conversations = Integer, default=0 total_workflows_used = Integer, default=0 favorite_workflows = JSON, default=[] # [{flow_id, flow_name, count}] preferred_work_hours = JSON, default=[] work_style_tags = JSON, default=[] common_topics = JSON, default=[] expertise_areas = JSON, default=[] total_tasks_assigned = Integer, default=0 total_tasks_created = Integer, default=0 avg_task_completion_h = Float, default=0 task_completion_rate = Float, default=0 preferred_llm_model = String(50), default="" preferred_language = String(20), default="zh" communication_tone = String(50), default="" profile_version = Integer, default=1 last_updated_at = DateTime last_conversation_at = DateTime created_at = DateTime, default=utcnow ``` --- ## 六、API 端点 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/memory/sessions` | 列出当前用户的记忆会话列表 | | GET | `/api/memory/sessions/{session_id}` | 查看某个会话的完整对话历史 | | DELETE | `/api/memory/sessions/{session_id}` | 清除某个会话记忆 | | GET | `/api/memory/profile` | 查看自己的用户画像 | | GET | `/api/memory/profile/{user_id}` | 管理者查看下属画像(Phase 2) | | POST | `/api/memory/profile/report` | 生成下属综合评估报告(Phase 2) | --- ## 七、用户画像 Prompt 注入格式(Phase 2) ``` [用户画像] 姓名: 张三 | 职位: 技术主管 | 部门: 研发中心 工作风格: 注重细节, 快速响应 擅长领域: 项目管理, 数据分析 最近话题: 任务管理(15次), 文档处理(8次) 常用流程: 任务分配(23次), 周报生成(12次) ``` --- ## 八、实现阶段 ### Phase 1 — 基础记忆持久化(本次实施)✅ - [ ] 创建 `modules/memory/` 模块目录 - [ ] 实现 `MemoryManager`(纯 Redis 存储) - [ ] 在 `flow_engine/router.py` 的 execute_flow 和 execute_flow_stream 中集成(执行前注入 + 执行后记录) - [ ] 在 `LLMNodeAgent.reply()` 中读取 `_memory_context` - [ ] 修复 `session_id` 传递链路 - [ ] 实现 `router.py` API 端点 - [ ] FlowChat.vue 增加「清空记忆」按钮 - [ ] 注册路由到 main.py ### Phase 2 — 用户画像系统(后续) - [ ] 新增 `user_profiles` 表 + 自动提取逻辑 - [ ] 画像注入 LLM context - [ ] 管理端画像查看 + 下属评估报告 ### Phase 3 — 记忆检索优化(后续) - [ ] 长会话自动摘要压缩 - [ ] 语义相似度检索历史 - [ ] Redis 数据过期清理 cron --- ## 九、核心优势 | 特性 | 说明 | |------|------| | **对用户透明** | 不需要在流编辑器里做任何配置,所有流自动获得记忆 | | **对开发者透明** | LLMNodeAgent 自动从 context 读记忆,不用改每个节点 | | **纯 Redis 存储** | 毫秒级读写,AOF 持久化保证不失忆 | | **用户-工作流隔离** | 不同用户不同流程的记忆互不可见 | | **7 天自动过期** | 不需要手动清理,旧记忆自动淘汰 | | **异步记录** | `record_exchange` 用 `asyncio.create_task`,不影响响应速度 |