# PLAN9 — 全面升级计划:记忆增强 · 流重构 · 模型管理 · 节点扩充 > 日期:2026-05-15 > 状态:规划中,待确认后实施 --- ## 一、记忆管理升级:吸收 Agent-Memory 分层架构 ### 1.1 现状分析 **我们当前方案(PLAN8 已实施)**: - 纯 Redis 存储:List 存消息、Hash 存元数据、String 存摘要 - 透明中间件:在 `flow_engine/router.py` 的 `execute_flow` / `execute_flow_stream` 中注入记忆 - LLMNodeAgent.reply() 从 `context["_memory_context"]` 读取历史摘要 + 最近消息 - 7 天 TTL 自动过期,异步记录不阻塞响应 **Agent-Memory 项目(ali-agentscope-src/Agent-Memory)**: - TypeScript 实现,以 OpenClaw 插件形式运行 - 四层记忆金字塔:L0(原始对话) → L1(结构化原子) → L2(场景块) → L3(用户画像) - 双写机制:JSONL 持久化 + SQLite 向量检索 - 混合搜索:BM25 关键词 + Embedding 向量 + RRF 融合 - Context Offload:长任务上下文溢出时 Mermaid 符号化压缩 - Pipeline 调度:每 N 轮对话触发 L1 提取,延迟触发 L2/L3 ### 1.2 对比评估 | 维度 | 我们方案 | Agent-Memory | 评价 | |------|---------|-------------|------| | **存储** | Redis(纯内存) | SQLite + JSONL(磁盘) | 我们快但不适合海量历史;他们慢但可持久化大量数据 | | **记忆分层** | 无分层(平铺消息列表) | L0/L1/L2/L3 四层金字塔 | **最大差距**:我们没有结构化提取,全是原始消息 | | **检索方式** | 全量最近 N 条 + 摘要 | BM25 + 向量 + RRF 混合搜索 | 我们只能按时间取最近消息,无法按语义检索 | | **去重** | 无 | L1 提取后 batchDedup(store/update/merge/skip) | 我们会重复存储相同信息 | | **用户画像** | 未实现 | L3 Persona 自动生成 + 增量更新 | 他们有完整的画像系统 | | **上下文溢出** | 无处理 | Context Offload + Mermaid 压缩 | 我们长对话会直接截断,丢失信息 | | **接入方式** | Python 原生集成 | TypeScript 插件,需 OpenClaw/Hermes 宿主 | **不能直接部署接入**,语言和架构不兼容 | | **性能** | 毫秒级(Redis) | 秒级(SQLite + LLM 提取) | 我们快但浅;他们慢但深 | ### 1.3 结论 **不能直接部署接入**:Agent-Memory 是 TypeScript 项目,依赖 OpenClaw/Hermes 宿主环境,与我们的 Python/FastAPI 架构完全不兼容。 **应吸收其核心设计思想**,但存储底座需要重新选型——详见 1.4 节分析。 ### 1.4 记忆存储底座选型:Redis vs MongoDB vs PostgreSQL #### 1.4.1 当前 Redis 方案的数据结构与访问模式 ``` 当前 Redis 键结构: mem:{uid}:{fid}:{sid}:messages → List(LPUSH 写入,LRANGE 读取最近 N 条) mem:{uid}:{fid}:{sid}:meta → Hash(HSET 写入,HGETALL 读取) mem:{uid}:{fid}:{sid}:summary → String(SETEX 写入,GET 读取) mem:{uid}:sessions → Set(SADD 写入,SMEMBERS 读取) 访问模式: 写入:record_exchange() → pipeline 批量 LPUSH + HSET + SADD(每次对话 1 次) 读取:inject_memory() → LRANGE + GET(每次对话 1 次,延迟 < 1ms) 删除:delete_session() → KEYS + DEL(低频,用户主动清除) 列表:list_user_sessions() → SMEMBERS + KEYS + HGETALL(低频,管理页面) ``` #### 1.4.2 三种数据库四维度对比 **维度一:内存占用与性能** | 指标 | Redis | MongoDB | PostgreSQL | |------|-------|---------|-----------| | **写入延迟** | < 1ms(纯内存) | 2-5ms(WAL + 内存映射) | 3-8ms(WAL + fsync) | | **读取延迟** | < 1ms | 1-3ms(WiredTiger 缓存命中) | 1-5ms(shared_buffers 命中) | | **inject_memory 延迟** | ~1ms | ~5ms | ~8ms | | **内存消耗(1万会话×40条消息)** | ~800MB(纯内存,无压缩) | ~200MB(磁盘 + 缓存热数据) | ~150MB(磁盘 + shared_buffers) | | **内存消耗(100万会话)** | ~80GB(不可行,需分片) | ~20GB(缓存 + 磁盘冷数据淘汰) | ~15GB(磁盘为主,热数据缓存) | | **大规模场景性能** | 受内存上限约束,超内存即 OOM | 缓存冷热分离,可支撑 TB 级 | 磁盘为主,可支撑 PB 级 | | **向量检索性能** | 需 Redis Stack(RediSearch),10万向量 ~50ms | 原生向量索引(Atlas Vector Search),10万向量 ~30ms | pgvector 扩展,10万向量 ~40ms | | **全文检索性能** | 需 Redis Stack(FTS 模块),非标准 | 原生文本索引,成熟 | tsvector + GIN 索引,成熟 | **关键结论**: - Redis 在小规模(< 10万会话)下性能最优,但内存成本线性增长 - MongoDB 和 PostgreSQL 在大规模场景下更优,冷数据自动落盘不占内存 - 向量检索三者性能接近,但 Redis 需额外安装 Stack 模块 **维度二:数据模型适配性** | 指标 | Redis | MongoDB | PostgreSQL | |------|-------|---------|-----------| | **消息列表(L0)** | List(天然有序,但无结构化查询) | 嵌入文档数组或独立文档(灵活) | 关系表 + 时间索引(标准) | | **结构化原子(L1)** | Hash + Sorted Set(需手动管理) | 文档(天然 JSON,灵活 schema) | JSONB 列(灵活 + 可索引) | | **场景块(L2)** | Hash(需手动序列化) | 文档(嵌套结构自然表达) | JSONB 或独立表 | | **用户画像(L3)** | String(纯文本,无查询能力) | 文档(结构化,可按字段查询) | JSONB 或独立表(可按字段查询) | | **向量存储** | 需 Redis Stack(非标准部署) | 原生支持(Atlas Vector Search) | pgvector 扩展(成熟) | | **数据迁移复杂度** | 基准(当前方案) | 中(需新建 MongoDB 连接 + 集合设计) | **低**(已有 PostgreSQL 连接池,复用 SQLAlchemy) | | **Schema 演进** | 无 Schema(灵活但无约束) | 无 Schema(灵活,可加验证) | 有 Schema(需迁移 SQL,但类型安全) | **关键结论**: - MongoDB 对文档型记忆数据最自然(JSON 原生存储,无需 ORM 映射) - PostgreSQL 迁移成本最低(项目已用 SQLAlchemy + asyncpg,复用现有连接池) - Redis 对 L0 消息列表操作最自然(LPUSH/LRANGE),但对 L1/L2/L3 结构化数据支持弱 **维度三:扩展性与维护成本** | 指标 | Redis | MongoDB | PostgreSQL | |------|-------|---------|-----------| | **水平扩展** | Redis Cluster(分片复杂,跨 slot 操作受限) | 原生分片(Shard Key 自动路由) | 读写分离 + 分区表(Citus 扩展) | | **运维复杂度** | 低(单实例简单,Cluster 复杂) | 中(副本集 + 分片需监控) | **最低**(项目已有 PostgreSQL 运维) | | **新增基础设施** | 无(已部署) | **需新增 MongoDB 服务** | **无需新增**(复用现有 PostgreSQL) | | **备份恢复** | RDB/AOF(需配置持久化策略) | mongodump/oplog | pg_dump/WAL 归档(**已有**) | | **监控工具** | redis-cli INFO | MongoDB Compass/Cloud Manager | pg_stat_statements(**已有**) | | **长期维护成本** | 高(内存持续增长需扩容) | 中 | **低**(磁盘扩容便宜,运维体系成熟) | **关键结论**: - **PostgreSQL 运维成本最低**:项目已有 PostgreSQL 实例、连接池、备份策略,无需新增基础设施 - MongoDB 需要额外部署和维护一套数据库服务 - Redis 长期成本最高:内存是磁盘的 10-50 倍价格 **维度四:事务与一致性需求** | 指标 | Redis | MongoDB | PostgreSQL | |------|-------|---------|-----------| | **事务支持** | Pipeline(非原子,MULTI 仅单 key 原子) | 4.0+ 多文档事务(副本集) | 完整 ACID 事务 | | **记忆数据事务需求** | 低(单次写入可容忍部分失败) | 低 | 低 | | **模型配置事务需求** | 不涉及 | 不涉及 | 高(需与现有业务表关联) | | **数据一致性** | 最终一致(AOF 异步刷盘可能丢 1s 数据) | 强一致(副本集 w:majority) | 强一致(WAL 同步刷盘) | | **记忆丢失风险** | AOF 异步刷盘时宕机可能丢失 | 副本集保证不丢 | WAL 保证不丢 | **关键结论**: - 记忆数据对事务要求低(丢失一条消息可接受),三种数据库均满足 - 但 PostgreSQL 的强一致性在模型配置等业务数据上更有优势 - Redis AOF 异步模式下有 1 秒数据丢失窗口,对"不失忆"要求有风险 #### 1.4.3 综合评估与选型决策 | 评估维度 | Redis | MongoDB | PostgreSQL | 权重 | |---------|-------|---------|-----------|------| | 性能(小规模) | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ | 20% | | 性能(大规模) | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 25% | | 数据模型适配 | ⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 20% | | 迁移成本 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐ | 15% | | 运维成本 | ⭐⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 10% | | 事务/一致性 | ⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 10% | | **加权总分** | **3.35** | **3.25** | **3.95** | — | #### 1.4.4 选型结论:PostgreSQL + Redis 混合方案 **选择 PostgreSQL 作为记忆主存储,Redis 保留为热数据缓存层**。 理由: 1. **零新增基础设施**:项目已有 PostgreSQL(asyncpg + SQLAlchemy),无需部署新服务 2. **迁移成本最低**:复用现有连接池、ORM、迁移框架,新增表即可 3. **PGVector 原生支持**:向量检索无需额外模块,`CREATE EXTENSION vector` 即可 4. **JSONB 灵活 + 可索引**:L1/L2/L3 结构化数据用 JSONB 存储,支持 GIN 索引按字段查询 5. **强一致性保证不失忆**:WAL 同步刷盘,无 Redis AOF 的 1 秒丢失窗口 6. **大规模成本优势**:磁盘存储比内存便宜 10-50 倍,100 万会话仅需 ~15GB 磁盘 **Redis 保留角色**: - 热数据缓存:最近 10 条消息缓存到 Redis(inject_memory 时直接读缓存,延迟 < 1ms) - 速率限制:已有 cache_manager 的限流功能 - 会话状态:当前活跃会话的临时状态 #### 1.4.5 PostgreSQL 记忆数据表设计 ```sql -- L0: 原始对话消息 CREATE TABLE memory_messages ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id), flow_id UUID NOT NULL REFERENCES flow_definitions(id), session_id UUID NOT NULL, role VARCHAR(20) NOT NULL, -- "user" / "assistant" content TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_memory_messages_session ON memory_messages(user_id, flow_id, session_id, created_at DESC); CREATE INDEX idx_memory_messages_user_flow ON memory_messages(user_id, flow_id, created_at DESC); -- L1: 结构化记忆原子 CREATE TABLE memory_atoms ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id), flow_id UUID REFERENCES flow_definitions(id), -- NULL 表示全局 atom_type VARCHAR(20) NOT NULL, -- "persona" / "episodic" / "instruction" content TEXT NOT NULL, priority SMALLINT DEFAULT 50, -- 0-100,越高越核心 source_session_id UUID, -- 来源会话 metadata JSONB DEFAULT '{}', -- 扩展元数据 embedding vector(1536), -- 向量(需 pgvector 扩展) created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_memory_atoms_user ON memory_atoms(user_id, atom_type, priority DESC); CREATE INDEX idx_memory_atoms_embedding ON memory_atoms USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); -- L2: 场景块 CREATE TABLE memory_scenes ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id), flow_id UUID REFERENCES flow_definitions(id), scene_name VARCHAR(200) NOT NULL, summary TEXT NOT NULL, heat INTEGER DEFAULT 0, -- 热度(访问频率) content JSONB DEFAULT '{}', -- 场景完整内容 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX idx_memory_scenes_user ON memory_scenes(user_id, flow_id, heat DESC); -- L3: 用户画像 CREATE TABLE memory_personas ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL UNIQUE REFERENCES users(id), -- 每用户一条 content JSONB NOT NULL DEFAULT '{}', -- 结构化画像 raw_text TEXT DEFAULT '', -- 原始画像文本(注入 LLM 用) version INTEGER DEFAULT 1, updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- 会话元数据 CREATE TABLE memory_sessions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID NOT NULL REFERENCES users(id), flow_id UUID NOT NULL REFERENCES flow_definitions(id), session_id UUID NOT NULL, flow_name VARCHAR(200) DEFAULT '', message_count INTEGER DEFAULT 0, last_active_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), UNIQUE(user_id, flow_id, session_id) ); CREATE INDEX idx_memory_sessions_user ON memory_sessions(user_id, last_active_at DESC); ``` #### 1.4.6 MemoryManager 改造方案 ```python class MemoryManager: def __init__(self, db_session_factory, redis: Redis): self.db_session_factory = db_session_factory # AsyncSessionLocal self.redis = redis # 热数据缓存 async def inject_memory(self, user_id, flow_id, session_id, context): # 1. 先查 Redis 缓存(最近 10 条消息) cached = await self._get_cached_messages(user_id, flow_id, session_id) if cached: recent_messages = cached else: # 2. 缓存未命中,查 PostgreSQL recent_messages = await self._query_recent_messages(user_id, flow_id, session_id) # 3. 回填 Redis 缓存 await self._cache_messages(user_id, flow_id, session_id, recent_messages) # 4. 查 L1 原子(PostgreSQL) atoms = await self._query_relevant_atoms(user_id, flow_id, context.get("input", "")) # 5. 查 L3 画像(PostgreSQL) persona = await self._query_persona(user_id) context["_memory_context"] = { "recent_messages": recent_messages, "atoms": atoms, "persona": persona, "session_id": session_id, } async def record_exchange(self, user_id, flow_id, session_id, user_msg, assistant_msg, flow_name=""): ts = datetime.utcnow() # 1. 写 PostgreSQL(主存储,保证持久化) async with self.db_session_factory() as session: session.add(MemoryMessage( user_id=user_id, flow_id=flow_id, session_id=session_id, role="user", content=user_msg, created_at=ts, )) session.add(MemoryMessage( user_id=user_id, flow_id=flow_id, session_id=session_id, role="assistant", content=assistant_msg, created_at=ts, )) await session.commit() # 2. 更新 Redis 缓存(热数据) await self._cache_append_message(user_id, flow_id, session_id, [ {"role": "user", "content": user_msg, "ts": ts.isoformat()}, {"role": "assistant", "content": assistant_msg, "ts": ts.isoformat()}, ]) # 3. 异步触发 L1 提取 asyncio.create_task(self._maybe_extract_atoms(user_id, flow_id, session_id)) ``` #### 1.4.7 迁移步骤 ``` Step 1: 创建 PostgreSQL 记忆表(init-db/03-memory-tables.sql) Step 2: 安装 pgvector 扩展(init-db/03-memory-tables.sql 中 CREATE EXTENSION IF NOT EXISTS vector) Step 3: 改造 MemoryManager,PostgreSQL 为主存储,Redis 为缓存 Step 4: 数据迁移脚本:Redis → PostgreSQL(一次性,读取 Redis 中现有记忆数据写入 PG) Step 5: 验证全链路:inject_memory / record_exchange / delete_session Step 6: 确认无误后,Redis 中的记忆键可设置短 TTL 自然过期 ``` #### 1.4.8 迁移 SQL 文件 详见 `init-db/03-memory-tables.sql`(实施时创建),包含: - `CREATE EXTENSION IF NOT EXISTS vector` - 上述 5 张表的 CREATE TABLE + INDEX - 幂等执行(IF NOT EXISTS) --- ## 二、全面升级流管理(对标 Dify) ### 2.1 现状问题 1. **节点类型不足**:当前 11 种节点(trigger/llm/tool/mcp/notify/condition/rag/loop/merge/code/output),Dify 有 15+ 种 2. **记忆不是默认行为**:虽然 PLAN8 实现了透明中间件,但缺乏 Dify 那样的"对话型 vs 工作流型"区分 3. **变量系统原始**:仅 `{{node_id.output}}` 模板,无类型、无作用域、无聚合 4. **版本管理不完善**:有 FlowVersion 快照但缺少完整的草稿/发布分离流程 ### 2.2 流类型区分(Chatflow vs Workflow) **Dify 的核心设计**:区分两种应用模式,记忆机制不同 | 模式 | 记忆 | 典型场景 | 对应我们的 | |------|------|---------|-----------| | **Chatflow(对话型)** | 自动维护对话历史,LLM 节点自动注入上下文 | 客服、助手、问答 | FlowChat.vue 使用的流 | | **Workflow(工作流型)** | 无自动记忆,每次执行独立 | 数据处理、批量任务、API 调用 | API 网关调用的流 | **改造方案**: ```python # FlowDefinition 新增字段 flow_mode = Column(String(20), default="chatflow") # "chatflow" | "workflow" # 记忆中间件逻辑调整 if f.flow_mode == "chatflow": await mm.inject_memory(...) # 注入记忆 asyncio.create_task(mm.record_exchange(...)) # 记录对话 else: pass # workflow 模式不注入记忆 ``` 前端 FlowEditor.vue 新增流类型选择(创建流时选择,创建后不可更改)。 ### 2.3 记忆默认启用策略 **核心原则**:所有 Chatflow 类型的流,记忆管理是默认行为,不需要用户在流中添加节点。 ``` 用户输入 → router.py │ ├─ Chatflow 模式: │ ├─ 执行前:inject_memory() → 检索历史 + 画像 → 注入 context │ ├─ LLM 调用:LLMNodeAgent 自动读取 _memory_context │ └─ 执行后:record_exchange() → 异步记录 + L1 提取 │ └─ Workflow 模式: └─ 直接执行,无记忆注入 ``` ### 2.4 流创建流程增强 **当前**:创建流 → 编辑画布 → 发布 **升级后**: ``` 创建流 ├─ 选择流类型(Chatflow / Workflow) ├─ 选择模板(可选,从模板市场选择) └─ 进入编辑器 编辑画布 ├─ 拖拽节点 ├─ 配置节点参数 └─ 连线 发布 ├─ 拓扑完整性检查(无孤立节点、起始/结束节点存在) ├─ 必填参数校验 ├─ 创建版本快照(不可变) └─ 更新 published_version_id 指针 ``` ### 2.5 流市场 vs 流列表的关系梳理 **当前问题**:流市场的"已上架工作流列表"和一级菜单"流列表"内容重叠,用户困惑。 **梳理方案**: | 页面 | 定位 | 数据来源 | 操作 | |------|------|---------|------| | **流列表**(管理端) | 我创建/我管理的所有流 | `GET /api/flow/definitions` | 编辑、删除、发布/下架、上架到市场 | | **流市场**(用户端) | 已上架到市场的公开流 | `GET /api/flow/market` | 查看详情、安装/使用、评价 | | **模板中心**(创建时) | 系统预置 + 社区贡献的模板 | `GET /api/flow/templates` | 一键使用模板创建新流 | **关键区分**: - 流列表 = 私有工作区,看到的是自己管理的流 - 流市场 = 公开商店,看到的是别人发布到市场的流 - 模板中心 = 快速起步,创建新流时的入口 --- ## 三、智能体管理改造:OpenAI-API-Compatible 模型管理 ### 3.1 现状问题 当前 `AgentConfig` 模型只有一个 `model` 字段(String),无法区分模型类型,无法管理 Embedding/Rerank 模型,无法支持多供应商。 ```python # 当前 AgentConfig model = Column(String(50), default="gpt-4o-mini") # 只能存一个模型名 ``` ### 3.2 Dify 的模型供应商架构 Dify 采用三层架构: ``` ModelProvider(供应商抽象层) ├── 模型类型:LLM / Embedding / Rerank / TTS / Speech-to-Text ├── 供应商实例:OpenAI / Anthropic / ZhipuAI / Ollama / OpenAI-API-Compatible └── 模型实例:gpt-4o / text-embedding-3-small / cohere-rerank ``` **OpenAI-API-Compatible 模式**:任何实现了 OpenAI API 格式的服务都能即插即用,只需配置 base_url + api_key + model_name。 ### 3.3 改造方案 #### 数据模型 ```python # 新增:模型供应商表 class ModelProvider(Base): __tablename__ = "model_providers" id = Column(UUID, primary_key=True, default=uuid.uuid4) name = Column(String(100), nullable=False) # "OpenAI" / "智谱AI" / "本地Ollama" provider_type = Column(String(50), nullable=False) # "openai" / "zhipu" / "ollama" / "openai_compatible" base_url = Column(String(500)) # API 端点 api_key = Column(Text) # 加密存储 extra_config = Column(JSON, default=dict) # 供应商特有配置 is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) # 新增:模型实例表 class ModelInstance(Base): __tablename__ = "model_instances" id = Column(UUID, primary_key=True, default=uuid.uuid4) provider_id = Column(UUID, ForeignKey("model_providers.id")) model_name = Column(String(100), nullable=False) # "gpt-4o" / "embedding-3" / "bge-rerank" model_type = Column(String(30), nullable=False) # "llm" / "embedding" / "rerank" display_name = Column(String(200)) # "GPT-4o" / "文本嵌入v3" capabilities = Column(JSON, default=dict) # {"vision": true, "function_calling": true, "max_tokens": 128000} default_params = Column(JSON, default=dict) # {"temperature": 0.7, "top_p": 1.0} is_default = Column(Boolean, default=False) # 是否为该类型的默认模型 is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) ``` #### 各模型类型的配置参数 | 模型类型 | 通用参数 | 特有参数 | |---------|---------|---------| | **LLM** | model, temperature, top_p, max_tokens, stream | vision(多模态)、function_calling、response_format | | **Embedding** | model, dimensions | encoding_format, input_type | | **Rerank** | model, top_n | query, documents, return_documents | #### API 端点设计 ``` # 供应商管理 POST /api/model-providers/ # 添加供应商 GET /api/model-providers/ # 列出供应商 PUT /api/model-providers/{id} # 更新供应商 DELETE /api/model-providers/{id} # 删除供应商 POST /api/model-providers/{id}/test # 测试连通性 # 模型实例管理 POST /api/model-providers/{id}/models/ # 添加模型 GET /api/model-instances/ # 列出所有模型(支持 ?type=llm 筛选) PUT /api/model-instances/{id} # 更新模型 DELETE /api/model-instances/{id} # 删除模型 POST /api/model-instances/{id}/test # 测试模型调用 # 默认模型设置 PUT /api/model-defaults/ # 设置各类型默认模型 GET /api/model-defaults/ # 获取各类型默认模型 ``` #### 前端页面 新增 `ModelProviderManager.vue`(管理端): - 供应商列表(卡片式,显示名称、类型、状态、模型数量) - 添加供应商表单(选择类型 → 填写 base_url/api_key → 自动检测可用模型) - 模型实例列表(按类型 Tab 分组:LLM / Embedding / Rerank) - 每个模型可设置默认参数、是否为默认模型 - 测试按钮:发送测试请求验证连通性 #### AgentConfig 关联调整 ```python # AgentConfig 改造 class AgentConfig(Base): # ...现有字段保留... model = Column(String(50), default="gpt-4o-mini") # 保留兼容 model_instance_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True) # 新增:关联模型实例 embedding_model_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True) # 新增:关联嵌入模型 ``` #### 引擎层适配 ```python # engine.py 中 LLMNodeAgent 改造 class LLMNodeAgent(AgentBase): async def reply(self, msg, **kwargs): context = kwargs.get("context", {}) config = self.config # 优先使用 model_instance_id 获取模型配置 model_instance_id = config.get("model_instance_id") if model_instance_id: provider, model = await self._resolve_model(model_instance_id) base_url = provider.base_url api_key = provider.api_key model_name = model.model_name else: # fallback 到旧逻辑 base_url = settings.OPENAI_BASE_URL api_key = settings.OPENAI_API_KEY model_name = config.get("model", "gpt-4o-mini") ``` ### 3.4 实施优先级 | 步骤 | 内容 | 优先级 | |------|------|--------| | Step 1 | 创建 ModelProvider + ModelInstance 数据模型 + 迁移 SQL | **P0** | | Step 2 | 实现供应商/模型 CRUD API | **P0** | | Step 3 | 前端 ModelProviderManager.vue 页面 | **P0** | | Step 4 | LLMNodeAgent 适配 model_instance_id | P1 | | Step 5 | RAGNodeAgent 适配 embedding_model_id | P1 | | Step 6 | AgentConfig 关联 model_instance_id | P1 | | Step 7 | 供应商自动检测可用模型 | P2 | --- ## 四、流编辑器节点扩充(对标 Dify) ### 4.1 现有节点 vs Dify 节点对比 | 节点 | 我们有 | Dify 有 | 差距说明 | |------|:------:|:-------:|---------| | 触发/开始 | ✅ trigger | ✅ start | 我们多了企微触发,Dify 只有输入变量 | | LLM | ✅ llm | ✅ llm | 基本对齐 | | 工具调用 | ✅ tool | ✅ tool | 基本对齐 | | MCP | ✅ mcp | ❌ | 我们独有 | | 通知 | ✅ notify | ❌ | 我们独有(企微通知) | | 条件分支 | ✅ condition | ✅ if-else | 基本对齐 | | RAG检索 | ✅ rag | ✅ knowledge-retrieval | 基本对齐 | | 循环 | ✅ loop | ✅ iteration | Dify 是数组迭代,我们是通用循环 | | 变量聚合 | ✅ merge | ✅ variable-assigner | 基本对齐 | | 代码执行 | ✅ code | ✅ code | 基本对齐 | | 输出/结束 | ✅ output | ✅ end | 基本对齐 | | **HTTP 请求** | ❌ | ✅ http-request | **缺失**:调用外部 API | | **问题分类器** | ❌ | ✅ question-classifier | **缺失**:意图路由 | | **模板转换** | ❌ | ✅ template-transform | **缺失**:Jinja2 格式化 | | **变量赋值** | ❌ | ✅ variable-assigner | **缺失**:运行时变量操作 | | **迭代** | ❌ | ✅ iteration | **缺失**:数组逐项处理(与 loop 不同) | | **问题优化** | ❌ | ✅ question-optimiser | **缺失**:检索前 query 改写 | ### 4.2 新增节点方案 #### P0 — 必须新增(使用频率高) **1. HTTP 请求节点(http_request)** ``` 功能:发送 HTTP 请求(GET/POST/PUT/DELETE/PATCH) 配置参数: - method: 请求方法 - url: 请求地址(支持变量模板) - headers: 请求头(JSON) - body: 请求体(支持变量模板) - auth_type: 认证方式(none/api_key/bearer/basic/oauth2) - auth_config: 认证配置 - timeout: 超时时间(秒) - retry_count: 重试次数 输出: - status_code: 状态码 - headers: 响应头 - body: 响应体(JSON 解析) - raw: 原始文本 前端:HttpRequestConfig.vue ``` **2. 问题分类器节点(question_classifier)** ``` 功能:基于 LLM 对用户输入进行意图分类,路由到不同分支 配置参数: - model: 使用的 LLM(可选,默认用系统默认) - categories: 分类列表 [{name, description}] - instruction: 分类指令(补充说明) 输出: - category: 分类结果 - confidence: 置信度 - 多出口:每个分类一个出口 前端:QuestionClassifierConfig.vue 引擎:调用 LLM 做分类,根据结果走不同分支(类似 condition 但基于 LLM) ``` **3. 变量赋值节点(variable_assigner)** ``` 功能:在运行时对变量进行赋值/修改操作 配置参数: - assignments: [{target_var, source_type, source_value}] - source_type: "constant" / "upstream_output" / "template" / "expression" 输出: - 变量值更新到 context 中 前端:VariableAssignerConfig.vue ``` #### P1 — 建议新增(提升体验) **4. 模板转换节点(template_transform)** ``` 功能:使用 Jinja2 模板语法对变量进行格式化/拼接/转换 配置参数: - template: Jinja2 模板字符串 - output_type: 输出类型(string/json/array) 输出: - rendered: 渲染后的文本 前端:TemplateTransformConfig.vue 引擎:使用 jinja2 库渲染模板 ``` **5. 迭代节点(iteration)** ``` 功能:对列表/数组逐项处理,内部可嵌套子工作流 与 loop 的区别: - loop:固定次数或条件循环,处理同一逻辑 - iteration:遍历数组,每次迭代处理一个元素,输出结果数组 配置参数: - input_array: 输入数组(变量引用) - output_variable: 每次迭代的输出变量名 - max_iterations: 最大迭代次数 输出: - output: 结果数组 前端:IterationConfig.vue ``` **6. 问题优化节点(question_optimiser)** ``` 功能:在 RAG 检索前对用户 query 进行改写/扩展,提升检索召回率 配置参数: - model: 使用的 LLM - strategy: 优化策略(rewrite/expand/decompose) - instruction: 自定义优化指令 输出: - optimized_query: 优化后的查询 - original_query: 原始查询 前端:QuestionOptimiserConfig.vue ``` ### 4.3 节点前端配置组件清单 | 新增节点 | 配置组件 | 复杂度 | |---------|---------|--------| | http_request | HttpRequestConfig.vue | 中 | | question_classifier | QuestionClassifierConfig.vue | 中 | | variable_assigner | VariableAssignerConfig.vue | 低 | | template_transform | TemplateTransformConfig.vue | 低 | | iteration | IterationConfig.vue | 中 | | question_optimiser | QuestionOptimiserConfig.vue | 低 | ### 4.4 引擎层改造 ```python # engine.py _create_agent() 新增分支 elif node_type == "http_request": return HttpRequestNodeAgent(...) elif node_type == "question_classifier": return QuestionClassifierNodeAgent(...) elif node_type == "variable_assigner": return VariableAssignerNodeAgent(...) elif node_type == "template_transform": return TemplateTransformNodeAgent(...) elif node_type == "iteration": return IterationNodeAgent(...) elif node_type == "question_optimiser": return QuestionOptimiserNodeAgent(...) ``` --- ## 五、版本管理与模板系统完善 ### 5.1 当前版本管理现状 - ✅ FlowVersion 模型已存在 - ✅ publish_flow / unpublish_flow / rollback_flow API 已实现 - ✅ 执行时加载 published_version 的 definition_json - ❌ 缺少前端版本管理界面(版本列表、对比、回滚按钮) - ❌ 缺少发布前的完整性校验 - ❌ 缺少草稿自动保存 ### 5.2 完善方案 **前端版本管理**: - FlowEditor.vue 工具栏增加"版本历史"按钮 - 版本历史弹窗:显示版本列表(版本号、发布时间、发布人、变更说明) - 支持查看历史版本的定义 JSON - 支持回滚到指定版本 - 发布时弹出"变更说明"输入框 **发布前校验**: ```python async def _validate_before_publish(definition: dict) -> list[str]: errors = [] nodes = definition.get("nodes", []) edges = definition.get("edges", []) # 1. 必须有起始节点 if not any(n.get("type") == "trigger" for n in nodes): errors.append("缺少触发/起始节点") # 2. Chatflow 必须有 LLM 节点 if flow_mode == "chatflow" and not any(n.get("type") == "llm" for n in nodes): errors.append("对话型流必须包含至少一个 LLM 节点") # 3. 无孤立节点 connected_ids = set() for e in edges: connected_ids.add(e["source"]) connected_ids.add(e["target"]) for n in nodes: if n["id"] not in connected_ids and len(nodes) > 1: errors.append(f"节点 '{n.get('label', n['id'])}' 未连接") return errors ``` **草稿自动保存**: - 前端 FlowEditor.vue 每 30 秒自动保存草稿到 `draft_definition_json` - 切换流时检测未保存草稿,提示用户 ### 5.3 模板系统增强 **当前**:硬编码 2 个模板(文档处理流、企微通知流) **升级后**: - 模板存储到数据库(新增 `flow_templates` 表) - 支持管理员创建模板(从已有流"另存为模板") - 模板分类:客服对话 / 文档处理 / 数据分析 / 通知推送 / 自定义 - 创建流时显示模板选择页面(可选,也可从空白创建) --- ## 六、实施路线图 ### Phase 1 — 基础架构升级(P0,2-3 周) | 任务 | 涉及文件 | 依赖 | |------|---------|------| | 1.1 创建 PostgreSQL 记忆表 + pgvector 扩展 | init-db/03-memory-tables.sql, models/\_\_init\_\_.py | 无 | | 1.2 改造 MemoryManager:PG 主存储 + Redis 缓存 | memory/manager.py | 1.1 | | 1.3 Redis → PostgreSQL 数据迁移脚本 | scripts/migrate_memory_redis_to_pg.py | 1.2 | | 1.4 创建 ModelProvider + ModelInstance 数据模型 | models/\_\_init\_\_.py, init-db/04-model-provider.sql | 无 | | 1.5 实现供应商/模型 CRUD API | modules/model_provider/router.py | 1.4 | | 1.6 前端 ModelProviderManager.vue | frontend/src/views/model/ | 1.5 | | 1.7 FlowDefinition 新增 flow_mode 字段 | models/\_\_init\_\_.py, init-db/ | 无 | | 1.8 记忆中间件按 flow_mode 区分 | flow_engine/router.py | 1.7 | | 1.9 新增 HTTP 请求节点 | engine.py, HttpRequestConfig.vue | 无 | | 1.10 新增问题分类器节点 | engine.py, QuestionClassifierConfig.vue | 无 | | 1.11 新增变量赋值节点 | engine.py, VariableAssignerConfig.vue | 无 | ### Phase 2 — 记忆分层 + 模型适配(P1,2-3 周) | 任务 | 涉及文件 | 依赖 | |------|---------|------| | 2.1 MemoryManager 增加 L1 结构化提取(PG 存储) | memory/manager.py | Phase 1 | | 2.2 MemoryManager 增加 L1 去重 | memory/manager.py | 2.1 | | 2.3 LLMNodeAgent 适配 model_instance_id | engine.py | 1.5 | | 2.4 RAGNodeAgent 适配 embedding_model_id | engine.py | 1.5 | | 2.5 AgentConfig 关联 model_instance_id(向后兼容) | models/\_\_init\_\_.py, agent_manager/ | 1.5 | | 2.6 新增模板转换节点 | engine.py, TemplateTransformConfig.vue | 无 | | 2.7 新增迭代节点 | engine.py, IterationConfig.vue | 无 | | 2.8 发布前校验 + 版本管理前端 | flow_engine/router.py, FlowEditor.vue | 无 | ### Phase 3 — 画像 + 检索优化(P2,2-3 周) | 任务 | 涉及文件 | 依赖 | |------|---------|------| | 3.1 L2 场景块提取(PG 存储) | memory/manager.py | Phase 2 | | 3.2 L3 用户画像生成(PG 存储) | memory/manager.py | 3.1 | | 3.3 混合检索(pgvector 向量 + tsvector 全文 + RRF) | memory/manager.py | Embedding 模型 | | 3.4 新增问题优化节点 | engine.py, QuestionOptimiserConfig.vue | 3.3 | | 3.5 模板系统数据库化 | flow_templates 表, router.py | 无 | | 3.6 草稿自动保存 | FlowEditor.vue, router.py | 无 | --- ## 七、向后兼容性保障 ### 7.1 AgentConfig 向后兼容 ```python # AgentConfig 改造:保留 model 字段,新增可选字段 class AgentConfig(Base): # 现有字段保留不变 model = Column(String(50), default="gpt-4o-mini") # 保留!旧数据自动兼容 # 新增可选字段(nullable=True,旧记录自动为 NULL) model_instance_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True) embedding_model_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True) ``` **兼容逻辑**: ```python # engine.py 中 LLMNodeAgent 获取模型配置 def _resolve_model_config(self, config: dict) -> dict: model_instance_id = config.get("model_instance_id") if model_instance_id: # 新路径:从 ModelInstance 获取完整配置 return { "base_url": provider.base_url, "api_key": provider.api_key, "model": model.model_name, "params": model.default_params, } else: # 旧路径:fallback 到 AgentConfig.model + 全局 settings return { "base_url": settings.LLM_API_BASE, "api_key": settings.LLM_API_KEY, "model": config.get("model", "gpt-4o-mini"), "params": {}, } ``` ### 7.2 记忆存储迁移兼容 ```python # MemoryManager 同时支持 Redis 和 PostgreSQL class MemoryManager: def __init__(self, db_session_factory, redis: Redis): self.db_session_factory = db_session_factory self.redis = redis async def inject_memory(self, user_id, flow_id, session_id, context): # 优先从 Redis 缓存读取(兼容旧数据) cached = await self._get_cached_messages(user_id, flow_id, session_id) if cached: recent_messages = cached else: # 缓存未命中,查 PostgreSQL recent_messages = await self._query_recent_messages(user_id, flow_id, session_id) if recent_messages: # 回填 Redis 缓存 await self._cache_messages(user_id, flow_id, session_id, recent_messages) # ...后续逻辑 ``` ### 7.3 数据库变更管理规范 所有涉及表结构改动的变更,必须: 1. 编写对应的 SQL 迁移文件到 `init-db/` 目录 2. SQL 文件使用 `IF NOT EXISTS` / `IF NOT EXISTS` 保证幂等 3. 新增列必须设置 `DEFAULT` 值或 `NULLABLE`,确保旧数据兼容 4. 迁移文件按序号命名:`01-init.sql` → `02-add-published-cols.sql` → `03-memory-tables.sql` → `04-model-provider.sql` --- ## 八、风险与注意事项 1. **pgvector 扩展安装**:需确认 Docker 镜像中的 PostgreSQL 是否包含 pgvector 扩展。如未包含,需在 Dockerfile 中添加 `RUN apt-get install postgresql-15-pgvector` 或使用 `pgvector/pgvector:pg15` 镜像 2. **Redis → PG 数据迁移**:迁移期间需短暂停写,建议在低峰期执行。迁移脚本需处理 Redis 中 JSON 序列化格式与 PG 表结构的映射 3. **LLM 调用成本**:L1 提取和去重都需要额外 LLM 调用,建议使用低成本模型(如 gpt-4o-mini)并控制触发频率 4. **模型供应商 API Key 安全**:必须加密存储,不能明文写入数据库。建议使用 `cryptography.fernet` 对称加密 5. **向后兼容**:AgentConfig.model 字段保留,model_instance_id 为可选(nullable=True),旧数据自动兼容,无需迁移 6. **节点类型注册**:新增节点需同时更新前端 nodeTypes 数组和后端 _create_agent() 分支,缺一不可 7. **流类型不可变**:flow_mode 在创建时确定,后续不可更改(Chatflow 和 Workflow 的引擎逻辑差异大) 8. **PostgreSQL 连接池**:记忆表查询复用现有连接池(pool_size=20),需监控连接数是否够用,必要时调整 9. **Redis 缓存一致性**:PG 写入成功但 Redis 缓存更新失败时,下次读取会缓存未命中走 PG 查询,自动修复,无需额外处理