You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

39 KiB

PLAN9 — 全面升级计划:记忆增强 · 流重构 · 模型管理 · 节点扩充

日期:2026-05-15 状态:规划中,待确认后实施


一、记忆管理升级:吸收 Agent-Memory 分层架构

1.1 现状分析

我们当前方案(PLAN8 已实施)

  • 纯 Redis 存储:List 存消息、Hash 存元数据、String 存摘要
  • 透明中间件:在 flow_engine/router.pyexecute_flow / execute_flow_stream 中注入记忆
  • LLMNodeAgent.reply() 从 context["_memory_context"] 读取历史摘要 + 最近消息
  • 7 天 TTL 自动过期,异步记录不阻塞响应

Agent-Memory 项目(ali-agentscope-src/Agent-Memory)

  • TypeScript 实现,以 OpenClaw 插件形式运行
  • 四层记忆金字塔:L0(原始对话) → L1(结构化原子) → L2(场景块) → L3(用户画像)
  • 双写机制:JSONL 持久化 + SQLite 向量检索
  • 混合搜索:BM25 关键词 + Embedding 向量 + RRF 融合
  • Context Offload:长任务上下文溢出时 Mermaid 符号化压缩
  • Pipeline 调度:每 N 轮对话触发 L1 提取,延迟触发 L2/L3

1.2 对比评估

维度 我们方案 Agent-Memory 评价
存储 Redis(纯内存) SQLite + JSONL(磁盘) 我们快但不适合海量历史;他们慢但可持久化大量数据
记忆分层 无分层(平铺消息列表) L0/L1/L2/L3 四层金字塔 最大差距:我们没有结构化提取,全是原始消息
检索方式 全量最近 N 条 + 摘要 BM25 + 向量 + RRF 混合搜索 我们只能按时间取最近消息,无法按语义检索
去重 L1 提取后 batchDedup(store/update/merge/skip) 我们会重复存储相同信息
用户画像 未实现 L3 Persona 自动生成 + 增量更新 他们有完整的画像系统
上下文溢出 无处理 Context Offload + Mermaid 压缩 我们长对话会直接截断,丢失信息
接入方式 Python 原生集成 TypeScript 插件,需 OpenClaw/Hermes 宿主 不能直接部署接入,语言和架构不兼容
性能 毫秒级(Redis) 秒级(SQLite + LLM 提取) 我们快但浅;他们慢但深

1.3 结论

不能直接部署接入:Agent-Memory 是 TypeScript 项目,依赖 OpenClaw/Hermes 宿主环境,与我们的 Python/FastAPI 架构完全不兼容。

应吸收其核心设计思想,但存储底座需要重新选型——详见 1.4 节分析。

1.4 记忆存储底座选型:Redis vs MongoDB vs PostgreSQL

1.4.1 当前 Redis 方案的数据结构与访问模式

当前 Redis 键结构:
  mem:{uid}:{fid}:{sid}:messages  → List(LPUSH 写入,LRANGE 读取最近 N 条)
  mem:{uid}:{fid}:{sid}:meta      → Hash(HSET 写入,HGETALL 读取)
  mem:{uid}:{fid}:{sid}:summary   → String(SETEX 写入,GET 读取)
  mem:{uid}:sessions              → Set(SADD 写入,SMEMBERS 读取)

访问模式:
  写入:record_exchange() → pipeline 批量 LPUSH + HSET + SADD(每次对话 1 次)
  读取:inject_memory()  → LRANGE + GET(每次对话 1 次,延迟 < 1ms)
  删除:delete_session() → KEYS + DEL(低频,用户主动清除)
  列表:list_user_sessions() → SMEMBERS + KEYS + HGETALL(低频,管理页面)

1.4.2 三种数据库四维度对比

维度一:内存占用与性能

指标 Redis MongoDB PostgreSQL
写入延迟 < 1ms(纯内存) 2-5ms(WAL + 内存映射) 3-8ms(WAL + fsync)
读取延迟 < 1ms 1-3ms(WiredTiger 缓存命中) 1-5ms(shared_buffers 命中)
inject_memory 延迟 ~1ms ~5ms ~8ms
内存消耗(1万会话×40条消息) ~800MB(纯内存,无压缩) ~200MB(磁盘 + 缓存热数据) ~150MB(磁盘 + shared_buffers)
内存消耗(100万会话) ~80GB(不可行,需分片) ~20GB(缓存 + 磁盘冷数据淘汰) ~15GB(磁盘为主,热数据缓存)
大规模场景性能 受内存上限约束,超内存即 OOM 缓存冷热分离,可支撑 TB 级 磁盘为主,可支撑 PB 级
向量检索性能 需 Redis Stack(RediSearch),10万向量 ~50ms 原生向量索引(Atlas Vector Search),10万向量 ~30ms pgvector 扩展,10万向量 ~40ms
全文检索性能 需 Redis Stack(FTS 模块),非标准 原生文本索引,成熟 tsvector + GIN 索引,成熟

关键结论

  • Redis 在小规模(< 10万会话)下性能最优,但内存成本线性增长
  • MongoDB 和 PostgreSQL 在大规模场景下更优,冷数据自动落盘不占内存
  • 向量检索三者性能接近,但 Redis 需额外安装 Stack 模块

维度二:数据模型适配性

指标 Redis MongoDB PostgreSQL
消息列表(L0) List(天然有序,但无结构化查询) 嵌入文档数组或独立文档(灵活) 关系表 + 时间索引(标准)
结构化原子(L1) Hash + Sorted Set(需手动管理) 文档(天然 JSON,灵活 schema) JSONB 列(灵活 + 可索引)
场景块(L2) Hash(需手动序列化) 文档(嵌套结构自然表达) JSONB 或独立表
用户画像(L3) String(纯文本,无查询能力) 文档(结构化,可按字段查询) JSONB 或独立表(可按字段查询)
向量存储 需 Redis Stack(非标准部署) 原生支持(Atlas Vector Search) pgvector 扩展(成熟)
数据迁移复杂度 基准(当前方案) 中(需新建 MongoDB 连接 + 集合设计) (已有 PostgreSQL 连接池,复用 SQLAlchemy)
Schema 演进 无 Schema(灵活但无约束) 无 Schema(灵活,可加验证) 有 Schema(需迁移 SQL,但类型安全)

关键结论

  • MongoDB 对文档型记忆数据最自然(JSON 原生存储,无需 ORM 映射)
  • PostgreSQL 迁移成本最低(项目已用 SQLAlchemy + asyncpg,复用现有连接池)
  • Redis 对 L0 消息列表操作最自然(LPUSH/LRANGE),但对 L1/L2/L3 结构化数据支持弱

维度三:扩展性与维护成本

指标 Redis MongoDB PostgreSQL
水平扩展 Redis Cluster(分片复杂,跨 slot 操作受限) 原生分片(Shard Key 自动路由) 读写分离 + 分区表(Citus 扩展)
运维复杂度 低(单实例简单,Cluster 复杂) 中(副本集 + 分片需监控) 最低(项目已有 PostgreSQL 运维)
新增基础设施 无(已部署) 需新增 MongoDB 服务 无需新增(复用现有 PostgreSQL)
备份恢复 RDB/AOF(需配置持久化策略) mongodump/oplog pg_dump/WAL 归档(已有
监控工具 redis-cli INFO MongoDB Compass/Cloud Manager pg_stat_statements(已有
长期维护成本 高(内存持续增长需扩容) (磁盘扩容便宜,运维体系成熟)

关键结论

  • PostgreSQL 运维成本最低:项目已有 PostgreSQL 实例、连接池、备份策略,无需新增基础设施
  • MongoDB 需要额外部署和维护一套数据库服务
  • Redis 长期成本最高:内存是磁盘的 10-50 倍价格

维度四:事务与一致性需求

指标 Redis MongoDB PostgreSQL
事务支持 Pipeline(非原子,MULTI 仅单 key 原子) 4.0+ 多文档事务(副本集) 完整 ACID 事务
记忆数据事务需求 低(单次写入可容忍部分失败)
模型配置事务需求 不涉及 不涉及 高(需与现有业务表关联)
数据一致性 最终一致(AOF 异步刷盘可能丢 1s 数据) 强一致(副本集 w:majority) 强一致(WAL 同步刷盘)
记忆丢失风险 AOF 异步刷盘时宕机可能丢失 副本集保证不丢 WAL 保证不丢

关键结论

  • 记忆数据对事务要求低(丢失一条消息可接受),三种数据库均满足
  • 但 PostgreSQL 的强一致性在模型配置等业务数据上更有优势
  • Redis AOF 异步模式下有 1 秒数据丢失窗口,对"不失忆"要求有风险

1.4.3 综合评估与选型决策

评估维度 Redis MongoDB PostgreSQL 权重
性能(小规模) 20%
性能(大规模) 25%
数据模型适配 20%
迁移成本 15%
运维成本 10%
事务/一致性 10%
加权总分 3.35 3.25 3.95

1.4.4 选型结论:PostgreSQL + Redis 混合方案

选择 PostgreSQL 作为记忆主存储,Redis 保留为热数据缓存层

理由:

  1. 零新增基础设施:项目已有 PostgreSQL(asyncpg + SQLAlchemy),无需部署新服务
  2. 迁移成本最低:复用现有连接池、ORM、迁移框架,新增表即可
  3. PGVector 原生支持:向量检索无需额外模块,CREATE EXTENSION vector 即可
  4. JSONB 灵活 + 可索引:L1/L2/L3 结构化数据用 JSONB 存储,支持 GIN 索引按字段查询
  5. 强一致性保证不失忆:WAL 同步刷盘,无 Redis AOF 的 1 秒丢失窗口
  6. 大规模成本优势:磁盘存储比内存便宜 10-50 倍,100 万会话仅需 ~15GB 磁盘

Redis 保留角色

  • 热数据缓存:最近 10 条消息缓存到 Redis(inject_memory 时直接读缓存,延迟 < 1ms)
  • 速率限制:已有 cache_manager 的限流功能
  • 会话状态:当前活跃会话的临时状态

1.4.5 PostgreSQL 记忆数据表设计

-- L0: 原始对话消息
CREATE TABLE memory_messages (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id     UUID NOT NULL REFERENCES users(id),
    flow_id     UUID NOT NULL REFERENCES flow_definitions(id),
    session_id  UUID NOT NULL,
    role        VARCHAR(20) NOT NULL,          -- "user" / "assistant"
    content     TEXT NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_memory_messages_session ON memory_messages(user_id, flow_id, session_id, created_at DESC);
CREATE INDEX idx_memory_messages_user_flow ON memory_messages(user_id, flow_id, created_at DESC);

-- L1: 结构化记忆原子
CREATE TABLE memory_atoms (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id     UUID NOT NULL REFERENCES users(id),
    flow_id     UUID REFERENCES flow_definitions(id),  -- NULL 表示全局
    atom_type   VARCHAR(20) NOT NULL,          -- "persona" / "episodic" / "instruction"
    content     TEXT NOT NULL,
    priority    SMALLINT DEFAULT 50,           -- 0-100,越高越核心
    source_session_id UUID,                    -- 来源会话
    metadata    JSONB DEFAULT '{}',            -- 扩展元数据
    embedding   vector(1536),                  -- 向量(需 pgvector 扩展)
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_memory_atoms_user ON memory_atoms(user_id, atom_type, priority DESC);
CREATE INDEX idx_memory_atoms_embedding ON memory_atoms USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);

-- L2: 场景块
CREATE TABLE memory_scenes (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id     UUID NOT NULL REFERENCES users(id),
    flow_id     UUID REFERENCES flow_definitions(id),
    scene_name  VARCHAR(200) NOT NULL,
    summary     TEXT NOT NULL,
    heat        INTEGER DEFAULT 0,             -- 热度(访问频率)
    content     JSONB DEFAULT '{}',            -- 场景完整内容
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_memory_scenes_user ON memory_scenes(user_id, flow_id, heat DESC);

-- L3: 用户画像
CREATE TABLE memory_personas (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id     UUID NOT NULL UNIQUE REFERENCES users(id),  -- 每用户一条
    content     JSONB NOT NULL DEFAULT '{}',    -- 结构化画像
    raw_text    TEXT DEFAULT '',                 -- 原始画像文本(注入 LLM 用)
    version     INTEGER DEFAULT 1,
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 会话元数据
CREATE TABLE memory_sessions (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    user_id     UUID NOT NULL REFERENCES users(id),
    flow_id     UUID NOT NULL REFERENCES flow_definitions(id),
    session_id  UUID NOT NULL,
    flow_name   VARCHAR(200) DEFAULT '',
    message_count INTEGER DEFAULT 0,
    last_active_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    UNIQUE(user_id, flow_id, session_id)
);

CREATE INDEX idx_memory_sessions_user ON memory_sessions(user_id, last_active_at DESC);

1.4.6 MemoryManager 改造方案

class MemoryManager:
    def __init__(self, db_session_factory, redis: Redis):
        self.db_session_factory = db_session_factory   # AsyncSessionLocal
        self.redis = redis                              # 热数据缓存

    async def inject_memory(self, user_id, flow_id, session_id, context):
        # 1. 先查 Redis 缓存(最近 10 条消息)
        cached = await self._get_cached_messages(user_id, flow_id, session_id)
        if cached:
            recent_messages = cached
        else:
            # 2. 缓存未命中,查 PostgreSQL
            recent_messages = await self._query_recent_messages(user_id, flow_id, session_id)
            # 3. 回填 Redis 缓存
            await self._cache_messages(user_id, flow_id, session_id, recent_messages)

        # 4. 查 L1 原子(PostgreSQL)
        atoms = await self._query_relevant_atoms(user_id, flow_id, context.get("input", ""))

        # 5. 查 L3 画像(PostgreSQL)
        persona = await self._query_persona(user_id)

        context["_memory_context"] = {
            "recent_messages": recent_messages,
            "atoms": atoms,
            "persona": persona,
            "session_id": session_id,
        }

    async def record_exchange(self, user_id, flow_id, session_id, user_msg, assistant_msg, flow_name=""):
        ts = datetime.utcnow()

        # 1. 写 PostgreSQL(主存储,保证持久化)
        async with self.db_session_factory() as session:
            session.add(MemoryMessage(
                user_id=user_id, flow_id=flow_id, session_id=session_id,
                role="user", content=user_msg, created_at=ts,
            ))
            session.add(MemoryMessage(
                user_id=user_id, flow_id=flow_id, session_id=session_id,
                role="assistant", content=assistant_msg, created_at=ts,
            ))
            await session.commit()

        # 2. 更新 Redis 缓存(热数据)
        await self._cache_append_message(user_id, flow_id, session_id, [
            {"role": "user", "content": user_msg, "ts": ts.isoformat()},
            {"role": "assistant", "content": assistant_msg, "ts": ts.isoformat()},
        ])

        # 3. 异步触发 L1 提取
        asyncio.create_task(self._maybe_extract_atoms(user_id, flow_id, session_id))

1.4.7 迁移步骤

Step 1: 创建 PostgreSQL 记忆表(init-db/03-memory-tables.sql)
Step 2: 安装 pgvector 扩展(init-db/03-memory-tables.sql 中 CREATE EXTENSION IF NOT EXISTS vector)
Step 3: 改造 MemoryManager,PostgreSQL 为主存储,Redis 为缓存
Step 4: 数据迁移脚本:Redis → PostgreSQL(一次性,读取 Redis 中现有记忆数据写入 PG)
Step 5: 验证全链路:inject_memory / record_exchange / delete_session
Step 6: 确认无误后,Redis 中的记忆键可设置短 TTL 自然过期

1.4.8 迁移 SQL 文件

详见 init-db/03-memory-tables.sql(实施时创建),包含:

  • CREATE EXTENSION IF NOT EXISTS vector
  • 上述 5 张表的 CREATE TABLE + INDEX
  • 幂等执行(IF NOT EXISTS)

二、全面升级流管理(对标 Dify)

2.1 现状问题

  1. 节点类型不足:当前 11 种节点(trigger/llm/tool/mcp/notify/condition/rag/loop/merge/code/output),Dify 有 15+ 种
  2. 记忆不是默认行为:虽然 PLAN8 实现了透明中间件,但缺乏 Dify 那样的"对话型 vs 工作流型"区分
  3. 变量系统原始:仅 {{node_id.output}} 模板,无类型、无作用域、无聚合
  4. 版本管理不完善:有 FlowVersion 快照但缺少完整的草稿/发布分离流程

2.2 流类型区分(Chatflow vs Workflow)

Dify 的核心设计:区分两种应用模式,记忆机制不同

模式 记忆 典型场景 对应我们的
Chatflow(对话型) 自动维护对话历史,LLM 节点自动注入上下文 客服、助手、问答 FlowChat.vue 使用的流
Workflow(工作流型) 无自动记忆,每次执行独立 数据处理、批量任务、API 调用 API 网关调用的流

改造方案

# FlowDefinition 新增字段
flow_mode = Column(String(20), default="chatflow")  # "chatflow" | "workflow"

# 记忆中间件逻辑调整
if f.flow_mode == "chatflow":
    await mm.inject_memory(...)   # 注入记忆
    asyncio.create_task(mm.record_exchange(...))  # 记录对话
else:
    pass  # workflow 模式不注入记忆

前端 FlowEditor.vue 新增流类型选择(创建流时选择,创建后不可更改)。

2.3 记忆默认启用策略

核心原则:所有 Chatflow 类型的流,记忆管理是默认行为,不需要用户在流中添加节点。

用户输入 → router.py
  │
  ├─ Chatflow 模式:
  │   ├─ 执行前:inject_memory() → 检索历史 + 画像 → 注入 context
  │   ├─ LLM 调用:LLMNodeAgent 自动读取 _memory_context
  │   └─ 执行后:record_exchange() → 异步记录 + L1 提取
  │
  └─ Workflow 模式:
      └─ 直接执行,无记忆注入

2.4 流创建流程增强

当前:创建流 → 编辑画布 → 发布

升级后

创建流
  ├─ 选择流类型(Chatflow / Workflow)
  ├─ 选择模板(可选,从模板市场选择)
  └─ 进入编辑器

编辑画布
  ├─ 拖拽节点
  ├─ 配置节点参数
  └─ 连线

发布
  ├─ 拓扑完整性检查(无孤立节点、起始/结束节点存在)
  ├─ 必填参数校验
  ├─ 创建版本快照(不可变)
  └─ 更新 published_version_id 指针

2.5 流市场 vs 流列表的关系梳理

当前问题:流市场的"已上架工作流列表"和一级菜单"流列表"内容重叠,用户困惑。

梳理方案

页面 定位 数据来源 操作
流列表(管理端) 我创建/我管理的所有流 GET /api/flow/definitions 编辑、删除、发布/下架、上架到市场
流市场(用户端) 已上架到市场的公开流 GET /api/flow/market 查看详情、安装/使用、评价
模板中心(创建时) 系统预置 + 社区贡献的模板 GET /api/flow/templates 一键使用模板创建新流

关键区分

  • 流列表 = 私有工作区,看到的是自己管理的流
  • 流市场 = 公开商店,看到的是别人发布到市场的流
  • 模板中心 = 快速起步,创建新流时的入口

三、智能体管理改造:OpenAI-API-Compatible 模型管理

3.1 现状问题

当前 AgentConfig 模型只有一个 model 字段(String),无法区分模型类型,无法管理 Embedding/Rerank 模型,无法支持多供应商。

# 当前 AgentConfig
model = Column(String(50), default="gpt-4o-mini")  # 只能存一个模型名

3.2 Dify 的模型供应商架构

Dify 采用三层架构:

ModelProvider(供应商抽象层)
  ├── 模型类型:LLM / Embedding / Rerank / TTS / Speech-to-Text
  ├── 供应商实例:OpenAI / Anthropic / ZhipuAI / Ollama / OpenAI-API-Compatible
  └── 模型实例:gpt-4o / text-embedding-3-small / cohere-rerank

OpenAI-API-Compatible 模式:任何实现了 OpenAI API 格式的服务都能即插即用,只需配置 base_url + api_key + model_name。

3.3 改造方案

数据模型

# 新增:模型供应商表
class ModelProvider(Base):
    __tablename__ = "model_providers"

    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    name = Column(String(100), nullable=False)          # "OpenAI" / "智谱AI" / "本地Ollama"
    provider_type = Column(String(50), nullable=False)   # "openai" / "zhipu" / "ollama" / "openai_compatible"
    base_url = Column(String(500))                       # API 端点
    api_key = Column(Text)                               # 加密存储
    extra_config = Column(JSON, default=dict)            # 供应商特有配置
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)


# 新增:模型实例表
class ModelInstance(Base):
    __tablename__ = "model_instances"

    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    provider_id = Column(UUID, ForeignKey("model_providers.id"))
    model_name = Column(String(100), nullable=False)     # "gpt-4o" / "embedding-3" / "bge-rerank"
    model_type = Column(String(30), nullable=False)      # "llm" / "embedding" / "rerank"
    display_name = Column(String(200))                   # "GPT-4o" / "文本嵌入v3"
    capabilities = Column(JSON, default=dict)            # {"vision": true, "function_calling": true, "max_tokens": 128000}
    default_params = Column(JSON, default=dict)           # {"temperature": 0.7, "top_p": 1.0}
    is_default = Column(Boolean, default=False)           # 是否为该类型的默认模型
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)

各模型类型的配置参数

模型类型 通用参数 特有参数
LLM model, temperature, top_p, max_tokens, stream vision(多模态)、function_calling、response_format
Embedding model, dimensions encoding_format, input_type
Rerank model, top_n query, documents, return_documents

API 端点设计

# 供应商管理
POST   /api/model-providers/              # 添加供应商
GET    /api/model-providers/              # 列出供应商
PUT    /api/model-providers/{id}          # 更新供应商
DELETE /api/model-providers/{id}          # 删除供应商
POST   /api/model-providers/{id}/test     # 测试连通性

# 模型实例管理
POST   /api/model-providers/{id}/models/  # 添加模型
GET    /api/model-instances/              # 列出所有模型(支持 ?type=llm 筛选)
PUT    /api/model-instances/{id}          # 更新模型
DELETE /api/model-instances/{id}          # 删除模型
POST   /api/model-instances/{id}/test     # 测试模型调用

# 默认模型设置
PUT    /api/model-defaults/               # 设置各类型默认模型
GET    /api/model-defaults/               # 获取各类型默认模型

前端页面

新增 ModelProviderManager.vue(管理端):

  • 供应商列表(卡片式,显示名称、类型、状态、模型数量)
  • 添加供应商表单(选择类型 → 填写 base_url/api_key → 自动检测可用模型)
  • 模型实例列表(按类型 Tab 分组:LLM / Embedding / Rerank)
  • 每个模型可设置默认参数、是否为默认模型
  • 测试按钮:发送测试请求验证连通性

AgentConfig 关联调整

# AgentConfig 改造
class AgentConfig(Base):
    # ...现有字段保留...
    model = Column(String(50), default="gpt-4o-mini")      # 保留兼容
    model_instance_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True)  # 新增:关联模型实例
    embedding_model_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True)  # 新增:关联嵌入模型

引擎层适配

# engine.py 中 LLMNodeAgent 改造
class LLMNodeAgent(AgentBase):
    async def reply(self, msg, **kwargs):
        context = kwargs.get("context", {})
        config = self.config

        # 优先使用 model_instance_id 获取模型配置
        model_instance_id = config.get("model_instance_id")
        if model_instance_id:
            provider, model = await self._resolve_model(model_instance_id)
            base_url = provider.base_url
            api_key = provider.api_key
            model_name = model.model_name
        else:
            # fallback 到旧逻辑
            base_url = settings.OPENAI_BASE_URL
            api_key = settings.OPENAI_API_KEY
            model_name = config.get("model", "gpt-4o-mini")

3.4 实施优先级

步骤 内容 优先级
Step 1 创建 ModelProvider + ModelInstance 数据模型 + 迁移 SQL P0
Step 2 实现供应商/模型 CRUD API P0
Step 3 前端 ModelProviderManager.vue 页面 P0
Step 4 LLMNodeAgent 适配 model_instance_id P1
Step 5 RAGNodeAgent 适配 embedding_model_id P1
Step 6 AgentConfig 关联 model_instance_id P1
Step 7 供应商自动检测可用模型 P2

四、流编辑器节点扩充(对标 Dify)

4.1 现有节点 vs Dify 节点对比

节点 我们有 Dify 有 差距说明
触发/开始 trigger start 我们多了企微触发,Dify 只有输入变量
LLM llm llm 基本对齐
工具调用 tool tool 基本对齐
MCP mcp 我们独有
通知 notify 我们独有(企微通知)
条件分支 condition if-else 基本对齐
RAG检索 rag knowledge-retrieval 基本对齐
循环 loop iteration Dify 是数组迭代,我们是通用循环
变量聚合 merge variable-assigner 基本对齐
代码执行 code code 基本对齐
输出/结束 output end 基本对齐
HTTP 请求 http-request 缺失:调用外部 API
问题分类器 question-classifier 缺失:意图路由
模板转换 template-transform 缺失:Jinja2 格式化
变量赋值 variable-assigner 缺失:运行时变量操作
迭代 iteration 缺失:数组逐项处理(与 loop 不同)
问题优化 question-optimiser 缺失:检索前 query 改写

4.2 新增节点方案

P0 — 必须新增(使用频率高)

1. HTTP 请求节点(http_request)

功能:发送 HTTP 请求(GET/POST/PUT/DELETE/PATCH)
配置参数:
  - method: 请求方法
  - url: 请求地址(支持变量模板)
  - headers: 请求头(JSON)
  - body: 请求体(支持变量模板)
  - auth_type: 认证方式(none/api_key/bearer/basic/oauth2)
  - auth_config: 认证配置
  - timeout: 超时时间(秒)
  - retry_count: 重试次数
输出:
  - status_code: 状态码
  - headers: 响应头
  - body: 响应体(JSON 解析)
  - raw: 原始文本
前端:HttpRequestConfig.vue

2. 问题分类器节点(question_classifier)

功能:基于 LLM 对用户输入进行意图分类,路由到不同分支
配置参数:
  - model: 使用的 LLM(可选,默认用系统默认)
  - categories: 分类列表 [{name, description}]
  - instruction: 分类指令(补充说明)
输出:
  - category: 分类结果
  - confidence: 置信度
  - 多出口:每个分类一个出口
前端:QuestionClassifierConfig.vue
引擎:调用 LLM 做分类,根据结果走不同分支(类似 condition 但基于 LLM)

3. 变量赋值节点(variable_assigner)

功能:在运行时对变量进行赋值/修改操作
配置参数:
  - assignments: [{target_var, source_type, source_value}]
    - source_type: "constant" / "upstream_output" / "template" / "expression"
输出:
  - 变量值更新到 context 中
前端:VariableAssignerConfig.vue

P1 — 建议新增(提升体验)

4. 模板转换节点(template_transform)

功能:使用 Jinja2 模板语法对变量进行格式化/拼接/转换
配置参数:
  - template: Jinja2 模板字符串
  - output_type: 输出类型(string/json/array)
输出:
  - rendered: 渲染后的文本
前端:TemplateTransformConfig.vue
引擎:使用 jinja2 库渲染模板

5. 迭代节点(iteration)

功能:对列表/数组逐项处理,内部可嵌套子工作流
与 loop 的区别:
  - loop:固定次数或条件循环,处理同一逻辑
  - iteration:遍历数组,每次迭代处理一个元素,输出结果数组
配置参数:
  - input_array: 输入数组(变量引用)
  - output_variable: 每次迭代的输出变量名
  - max_iterations: 最大迭代次数
输出:
  - output: 结果数组
前端:IterationConfig.vue

6. 问题优化节点(question_optimiser)

功能:在 RAG 检索前对用户 query 进行改写/扩展,提升检索召回率
配置参数:
  - model: 使用的 LLM
  - strategy: 优化策略(rewrite/expand/decompose)
  - instruction: 自定义优化指令
输出:
  - optimized_query: 优化后的查询
  - original_query: 原始查询
前端:QuestionOptimiserConfig.vue

4.3 节点前端配置组件清单

新增节点 配置组件 复杂度
http_request HttpRequestConfig.vue
question_classifier QuestionClassifierConfig.vue
variable_assigner VariableAssignerConfig.vue
template_transform TemplateTransformConfig.vue
iteration IterationConfig.vue
question_optimiser QuestionOptimiserConfig.vue

4.4 引擎层改造

# engine.py _create_agent() 新增分支
elif node_type == "http_request":
    return HttpRequestNodeAgent(...)
elif node_type == "question_classifier":
    return QuestionClassifierNodeAgent(...)
elif node_type == "variable_assigner":
    return VariableAssignerNodeAgent(...)
elif node_type == "template_transform":
    return TemplateTransformNodeAgent(...)
elif node_type == "iteration":
    return IterationNodeAgent(...)
elif node_type == "question_optimiser":
    return QuestionOptimiserNodeAgent(...)

五、版本管理与模板系统完善

5.1 当前版本管理现状

  • FlowVersion 模型已存在
  • publish_flow / unpublish_flow / rollback_flow API 已实现
  • 执行时加载 published_version 的 definition_json
  • 缺少前端版本管理界面(版本列表、对比、回滚按钮)
  • 缺少发布前的完整性校验
  • 缺少草稿自动保存

5.2 完善方案

前端版本管理

  • FlowEditor.vue 工具栏增加"版本历史"按钮
  • 版本历史弹窗:显示版本列表(版本号、发布时间、发布人、变更说明)
  • 支持查看历史版本的定义 JSON
  • 支持回滚到指定版本
  • 发布时弹出"变更说明"输入框

发布前校验

async def _validate_before_publish(definition: dict) -> list[str]:
    errors = []
    nodes = definition.get("nodes", [])
    edges = definition.get("edges", [])

    # 1. 必须有起始节点
    if not any(n.get("type") == "trigger" for n in nodes):
        errors.append("缺少触发/起始节点")

    # 2. Chatflow 必须有 LLM 节点
    if flow_mode == "chatflow" and not any(n.get("type") == "llm" for n in nodes):
        errors.append("对话型流必须包含至少一个 LLM 节点")

    # 3. 无孤立节点
    connected_ids = set()
    for e in edges:
        connected_ids.add(e["source"])
        connected_ids.add(e["target"])
    for n in nodes:
        if n["id"] not in connected_ids and len(nodes) > 1:
            errors.append(f"节点 '{n.get('label', n['id'])}' 未连接")

    return errors

草稿自动保存

  • 前端 FlowEditor.vue 每 30 秒自动保存草稿到 draft_definition_json
  • 切换流时检测未保存草稿,提示用户

5.3 模板系统增强

当前:硬编码 2 个模板(文档处理流、企微通知流)

升级后

  • 模板存储到数据库(新增 flow_templates 表)
  • 支持管理员创建模板(从已有流"另存为模板")
  • 模板分类:客服对话 / 文档处理 / 数据分析 / 通知推送 / 自定义
  • 创建流时显示模板选择页面(可选,也可从空白创建)

六、实施路线图

Phase 1 — 基础架构升级(P0,2-3 周)

任务 涉及文件 依赖
1.1 创建 PostgreSQL 记忆表 + pgvector 扩展 init-db/03-memory-tables.sql, models/__init__.py
1.2 改造 MemoryManager:PG 主存储 + Redis 缓存 memory/manager.py 1.1
1.3 Redis → PostgreSQL 数据迁移脚本 scripts/migrate_memory_redis_to_pg.py 1.2
1.4 创建 ModelProvider + ModelInstance 数据模型 models/__init__.py, init-db/04-model-provider.sql
1.5 实现供应商/模型 CRUD API modules/model_provider/router.py 1.4
1.6 前端 ModelProviderManager.vue frontend/src/views/model/ 1.5
1.7 FlowDefinition 新增 flow_mode 字段 models/__init__.py, init-db/
1.8 记忆中间件按 flow_mode 区分 flow_engine/router.py 1.7
1.9 新增 HTTP 请求节点 engine.py, HttpRequestConfig.vue
1.10 新增问题分类器节点 engine.py, QuestionClassifierConfig.vue
1.11 新增变量赋值节点 engine.py, VariableAssignerConfig.vue

Phase 2 — 记忆分层 + 模型适配(P1,2-3 周)

任务 涉及文件 依赖
2.1 MemoryManager 增加 L1 结构化提取(PG 存储) memory/manager.py Phase 1
2.2 MemoryManager 增加 L1 去重 memory/manager.py 2.1
2.3 LLMNodeAgent 适配 model_instance_id engine.py 1.5
2.4 RAGNodeAgent 适配 embedding_model_id engine.py 1.5
2.5 AgentConfig 关联 model_instance_id(向后兼容) models/__init__.py, agent_manager/ 1.5
2.6 新增模板转换节点 engine.py, TemplateTransformConfig.vue
2.7 新增迭代节点 engine.py, IterationConfig.vue
2.8 发布前校验 + 版本管理前端 flow_engine/router.py, FlowEditor.vue

Phase 3 — 画像 + 检索优化(P2,2-3 周)

任务 涉及文件 依赖
3.1 L2 场景块提取(PG 存储) memory/manager.py Phase 2
3.2 L3 用户画像生成(PG 存储) memory/manager.py 3.1
3.3 混合检索(pgvector 向量 + tsvector 全文 + RRF) memory/manager.py Embedding 模型
3.4 新增问题优化节点 engine.py, QuestionOptimiserConfig.vue 3.3
3.5 模板系统数据库化 flow_templates 表, router.py
3.6 草稿自动保存 FlowEditor.vue, router.py

七、向后兼容性保障

7.1 AgentConfig 向后兼容

# AgentConfig 改造:保留 model 字段,新增可选字段
class AgentConfig(Base):
    # 现有字段保留不变
    model = Column(String(50), default="gpt-4o-mini")      # 保留!旧数据自动兼容

    # 新增可选字段(nullable=True,旧记录自动为 NULL)
    model_instance_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True)
    embedding_model_id = Column(UUID, ForeignKey("model_instances.id"), nullable=True)

兼容逻辑

# engine.py 中 LLMNodeAgent 获取模型配置
def _resolve_model_config(self, config: dict) -> dict:
    model_instance_id = config.get("model_instance_id")
    if model_instance_id:
        # 新路径:从 ModelInstance 获取完整配置
        return {
            "base_url": provider.base_url,
            "api_key": provider.api_key,
            "model": model.model_name,
            "params": model.default_params,
        }
    else:
        # 旧路径:fallback 到 AgentConfig.model + 全局 settings
        return {
            "base_url": settings.LLM_API_BASE,
            "api_key": settings.LLM_API_KEY,
            "model": config.get("model", "gpt-4o-mini"),
            "params": {},
        }

7.2 记忆存储迁移兼容

# MemoryManager 同时支持 Redis 和 PostgreSQL
class MemoryManager:
    def __init__(self, db_session_factory, redis: Redis):
        self.db_session_factory = db_session_factory
        self.redis = redis

    async def inject_memory(self, user_id, flow_id, session_id, context):
        # 优先从 Redis 缓存读取(兼容旧数据)
        cached = await self._get_cached_messages(user_id, flow_id, session_id)
        if cached:
            recent_messages = cached
        else:
            # 缓存未命中,查 PostgreSQL
            recent_messages = await self._query_recent_messages(user_id, flow_id, session_id)
            if recent_messages:
                # 回填 Redis 缓存
                await self._cache_messages(user_id, flow_id, session_id, recent_messages)

        # ...后续逻辑

7.3 数据库变更管理规范

所有涉及表结构改动的变更,必须:

  1. 编写对应的 SQL 迁移文件到 init-db/ 目录
  2. SQL 文件使用 IF NOT EXISTS / IF NOT EXISTS 保证幂等
  3. 新增列必须设置 DEFAULT 值或 NULLABLE,确保旧数据兼容
  4. 迁移文件按序号命名:01-init.sql02-add-published-cols.sql03-memory-tables.sql04-model-provider.sql

八、风险与注意事项

  1. pgvector 扩展安装:需确认 Docker 镜像中的 PostgreSQL 是否包含 pgvector 扩展。如未包含,需在 Dockerfile 中添加 RUN apt-get install postgresql-15-pgvector 或使用 pgvector/pgvector:pg15 镜像
  2. Redis → PG 数据迁移:迁移期间需短暂停写,建议在低峰期执行。迁移脚本需处理 Redis 中 JSON 序列化格式与 PG 表结构的映射
  3. LLM 调用成本:L1 提取和去重都需要额外 LLM 调用,建议使用低成本模型(如 gpt-4o-mini)并控制触发频率
  4. 模型供应商 API Key 安全:必须加密存储,不能明文写入数据库。建议使用 cryptography.fernet 对称加密
  5. 向后兼容:AgentConfig.model 字段保留,model_instance_id 为可选(nullable=True),旧数据自动兼容,无需迁移
  6. 节点类型注册:新增节点需同时更新前端 nodeTypes 数组和后端 _create_agent() 分支,缺一不可
  7. 流类型不可变:flow_mode 在创建时确定,后续不可更改(Chatflow 和 Workflow 的引擎逻辑差异大)
  8. PostgreSQL 连接池:记忆表查询复用现有连接池(pool_size=20),需监控连接数是否够用,必要时调整
  9. Redis 缓存一致性:PG 写入成功但 Redis 缓存更新失败时,下次读取会缓存未命中走 PG 查询,自动修复,无需额外处理