You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

18 KiB

PLAN7 — 自定义 API 工具导入 + 前端 EventSource 流式聊天组件

一、现状与差距

PLAN6 完成后,系统已具备:

  • 版本快照(FlowVersion)
  • SSE 流式输出(后端)
  • 统一 API 网关(/v1/chat-messages, /v1/workflows/run)
  • API Key 认证
  • 工具 Schema 标准化(内置工具)
  • Flow 节点 Memory
  • ParallelMergeNodeAgent + Loop 数组迭代

剩余 15% 差距:

  1. 自定义 API 工具导入:用户无法在前端填入第三方 OpenAPI/Swagger URL,系统自动解析为工具 Schema 并注册到 ToolNodeAgent
  2. 前端 EventSource 聊天组件:前端没有支持 SSE 的聊天界面,无法实时看到流式输出

二、功能 1:自定义 API 工具导入(OpenAPI/Swagger 解析)

2.1 目标

用户在前端输入第三方 API 的 OpenAPI/Swagger URL(如 https://api.example.com/openapi.json),后端自动:

  1. 下载并解析 OpenAPI 文档
  2. 提取每个 endpoint 的 method、path、parameters、description
  3. 转换为 OpenAI Function Calling Schema 格式
  4. 注册为 Flow 可用的自定义工具
  5. 执行时通过 httpx 动态调用

2.2 数据模型

# models/__init__.py 新增
class CustomTool(Base):
    __tablename__ = "custom_tools"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    name = Column(String(100), nullable=False)           # 工具名称
    description = Column(Text)                            # 工具描述
    schema_json = Column(JSON, nullable=False)            # OpenAI Function Calling Schema
    endpoint_url = Column(String(500), nullable=False)    # 基础 URL
    method = Column(String(10), default="GET")            # HTTP 方法
    path = Column(String(500))                            # API 路径
    headers_json = Column(JSON, default=dict)             # 固定请求头
    auth_type = Column(String(20), default="none")        # none/api_key/oauth
    auth_config = Column(JSON, default=dict)              # 认证配置
    created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"))
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

2.3 Schema

# schemas/__init__.py 新增
class CustomToolCreate(BaseModel):
    name: str
    description: str | None = None
    openapi_url: str | None = None          # 二选一:URL 或手动配置
    endpoint_url: str | None = None
    method: str = "GET"
    path: str = ""
    headers: dict = {}
    auth_type: str = "none"
    auth_config: dict = {}
    schema_json: dict | None = None         # 手动传入 Schema

class CustomToolOut(BaseModel):
    id: uuid.UUID
    name: str
    description: str | None = None
    schema_json: dict
    endpoint_url: str
    method: str
    path: str
    auth_type: str
    is_active: bool
    created_at: datetime | None = None

class OpenAPIImportRequest(BaseModel):
    openapi_url: str
    base_url_override: str | None = None    # 可选覆盖 base_url

2.4 后端实现

模块:backend/modules/custom_tool/

parser.py — OpenAPI 解析器

import json
import httpx
from typing import Any

class OpenAPIParser:
    def __init__(self, spec: dict):
        self.spec = spec
        self.base_url = spec.get("servers", [{}])[0].get("url", "")

    def parse_tools(self) -> list[dict]:
        tools = []
        paths = self.spec.get("paths", {})
        for path, methods in paths.items():
            for method, operation in methods.items():
                if method in ("get", "post", "put", "delete", "patch"):
                    tool = self._parse_endpoint(path, method, operation)
                    if tool:
                        tools.append(tool)
        return tools

    def _parse_endpoint(self, path: str, method: str, operation: dict) -> dict | None:
        name = operation.get("operationId", f"{method}_{path.replace('/', '_').strip('_')}")
        description = operation.get("summary", operation.get("description", f"{method.upper()} {path}"))
        parameters = self._parse_parameters(operation)
        return {
            "name": name,
            "description": description,
            "parameters": {
                "type": "object",
                "properties": parameters,
                "required": [p["name"] for p in operation.get("parameters", []) if p.get("required")],
            },
            "path": path,
            "method": method.upper(),
        }

    def _parse_parameters(self, operation: dict) -> dict[str, Any]:
        props = {}
        for param in operation.get("parameters", []):
            schema = param.get("schema", {})
            props[param["name"]] = {
                "type": schema.get("type", "string"),
                "description": param.get("description", ""),
            }
            if "enum" in schema:
                props[param["name"]]["enum"] = schema["enum"]
        # requestBody
        body = operation.get("requestBody", {}).get("content", {}).get("application/json", {}).get("schema", {})
        if body:
            for name, prop in body.get("properties", {}).items():
                props[name] = {"type": prop.get("type", "string"), "description": prop.get("description", "")}
        return props

executor.py — 动态执行器

import httpx
import json
from typing import Any

class CustomToolExecutor:
    def __init__(self, tool: dict):
        self.tool = tool
        self.endpoint_url = tool["endpoint_url"]
        self.method = tool["method"]
        self.path = tool["path"]
        self.headers = tool.get("headers_json", {})
        self.auth_type = tool.get("auth_type", "none")
        self.auth_config = tool.get("auth_config", {})

    async def execute(self, params: dict) -> str:
        url = f"{self.endpoint_url.rstrip('/')}/{self.path.lstrip('/')}"
        headers = dict(self.headers)

        if self.auth_type == "api_key":
            key = self.auth_config.get("key", "")
            loc = self.auth_config.get("location", "header")  # header / query
            name = self.auth_config.get("name", "X-API-Key")
            if loc == "header":
                headers[name] = key
            else:
                params[name] = key

        elif self.auth_type == "bearer":
            headers["Authorization"] = f"Bearer {self.auth_config.get('token', '')}"

        async with httpx.AsyncClient(timeout=30) as client:
            if self.method == "GET":
                resp = await client.get(url, params=params, headers=headers)
            else:
                resp = await client.request(self.method, url, json=params, headers=headers)

        try:
            data = resp.json()
            return json.dumps(data, ensure_ascii=False, indent=2)[:2000]
        except:
            return resp.text[:2000]

router.py — CRUD + 导入端点

from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import CustomTool
from schemas import CustomToolCreate, CustomToolOut, OpenAPIImportRequest
from .parser import OpenAPIParser
from .executor import CustomToolExecutor
import httpx

router = APIRouter(prefix="/api/custom-tools", tags=["custom_tools"])

@router.post("/import-openapi")
async def import_openapi(req: OpenAPIImportRequest, db: AsyncSession = Depends(get_db)):
    async with httpx.AsyncClient() as client:
        resp = await client.get(req.openapi_url, timeout=30)
        spec = resp.json()

    parser = OpenAPIParser(spec)
    tools = parser.parse_tools()
    base_url = req.base_url_override or parser.base_url

    created = []
    for t in tools:
        tool = CustomTool(
            name=t["name"],
            description=t["description"],
            schema_json=t["parameters"],
            endpoint_url=base_url,
            method=t["method"],
            path=t["path"],
        )
        db.add(tool)
        created.append(t["name"])
    await db.flush()
    return {"code": 200, "message": f"成功导入 {len(created)} 个工具", "data": {"tools": created}}

@router.post("/", response_model=CustomToolOut)
async def create_custom_tool(req: CustomToolCreate, db: AsyncSession = Depends(get_db)):
    tool = CustomTool(
        name=req.name,
        description=req.description,
        schema_json=req.schema_json or {},
        endpoint_url=req.endpoint_url or "",
        method=req.method,
        path=req.path,
        headers_json=req.headers,
        auth_type=req.auth_type,
        auth_config=req.auth_config,
    )
    db.add(tool)
    await db.flush()
    return tool

@router.get("/", response_model=list[CustomToolOut])
async def list_custom_tools(db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(CustomTool).where(CustomTool.is_active == True))
    return result.scalars().all()

@router.post("/{tool_id}/test")
async def test_custom_tool(tool_id: uuid.UUID, params: dict, db: AsyncSession = Depends(get_db)):
    tool = await db.get(CustomTool, tool_id)
    if not tool:
        raise HTTPException(404, "工具不存在")
    executor = CustomToolExecutor({
        "endpoint_url": tool.endpoint_url,
        "method": tool.method,
        "path": tool.path,
        "headers_json": tool.headers_json,
        "auth_type": tool.auth_type,
        "auth_config": tool.auth_config,
    })
    result = await executor.execute(params)
    return {"code": 200, "data": {"result": result}}

2.5 ToolNodeAgent 集成

修改 engine.py_init_registry,加载 CustomTool:

@classmethod
def _init_registry(cls):
    if cls._TOOL_REGISTRY:
        return
    # ... 原有内置工具注册 ...
    # 加载自定义工具
    try:
        from sqlalchemy import select
        from database import SessionLocal
        from models import CustomTool
        # 注:这里需要用 sync session 或改为异步初始化
    except ImportError:
        pass

更优方案:在 FlowEngine 初始化时异步加载自定义工具:

async def _load_custom_tools(self, db: AsyncSession):
    from models import CustomTool
    result = await db.execute(select(CustomTool).where(CustomTool.is_active == True))
    for tool in result.scalars().all():
        ToolNodeAgent._TOOL_REGISTRY[tool.name] = lambda params, t=tool: CustomToolExecutor(t).execute(params)
        ToolNodeAgent._TOOL_SCHEMAS[tool.name] = tool.schema_json

2.6 前端页面

frontend/src/views/tools/CustomToolManager.vue

  • 表格:列出所有自定义工具(名称、方法、路径、认证方式)
  • 导入按钮:弹出对话框,输入 OpenAPI URL → 点击导入
  • 测试按钮:填入参数 → 调用测试端点 → 显示结果
  • 手动创建:表单填写 name/endpoint/method/path/schema

frontend/src/views/flow/node-configs/ToolConfig.vue 增强:

  • 工具选择下拉框增加"自定义工具"分组
  • 选择自定义工具后,根据 schema_json 动态生成参数表单

三、功能 2:前端 EventSource 流式聊天组件

3.1 目标

创建一个独立的聊天页面/组件,支持:

  1. 通过 EventSource 连接后端 SSE 端点
  2. 实时显示 workflow_startednode_startedtext_chunkworkflow_finished 事件
  3. 支持选择已发布的 Flow
  4. 显示节点执行进度和中间结果
  5. 支持多轮对话(session_id 持久化)

3.2 组件设计

frontend/src/views/chat/FlowChat.vue

<template>
  <div class="flow-chat-page">
    <el-page-header @back="$router.back()" content="流式对话" />
    
    <el-card style="margin-top: 20px">
      <el-form inline>
        <el-form-item label="选择流">
          <el-select v-model="selectedFlowId" placeholder="选择已发布的流">
            <el-option v-for="f in publishedFlows" :key="f.id" :label="f.name" :value="f.id" />
          </el-select>
        </el-form-item>
        <el-form-item label="模式">
          <el-radio-group v-model="responseMode">
            <el-radio-button label="blocking">阻塞模式</el-radio-button>
            <el-radio-button label="streaming">流式模式</el-radio-button>
          </el-radio-group>
        </el-form-item>
      </el-form>
    </el-card>

    <el-card style="margin-top: 20px; min-height: 400px">
      <div ref="chatContainer" class="chat-container">
        <div v-for="msg in messages" :key="msg.id" :class="['message', msg.role]">
          <div class="message-content" v-html="renderMarkdown(msg.content)" />
          <div v-if="msg.nodeResults" class="node-results">
            <el-collapse>
              <el-collapse-item title="节点执行详情">
                <pre>{{ JSON.stringify(msg.nodeResults, null, 2) }}</pre>
              </el-collapse-item>
            </el-collapse>
          </div>
        </div>
        <div v-if="streaming" class="message assistant streaming">
          <div class="message-content">{{ streamBuffer }}</div>
        </div>
      </div>
    </el-card>

    <el-card style="margin-top: 20px">
      <el-input
        v-model="inputText"
        type="textarea"
        :rows="3"
        placeholder="输入消息..."
        @keydown.enter.prevent="sendMessage"
      />
      <el-button type="primary" @click="sendMessage" :loading="sending">
        发送
      </el-button>
    </el-card>
  </div>
</template>

<script setup lang="ts">
import { ref, onMounted } from 'vue'
import { ElMessage } from 'element-plus'
import { flowApi } from '@/api'

const selectedFlowId = ref('')
const responseMode = ref('streaming')
const inputText = ref('')
const messages = ref<any[]>([])
const streaming = ref(false)
const streamBuffer = ref('')
const sending = ref(false)
const publishedFlows = ref<any[]>([])
const sessionId = ref(localStorage.getItem('flow_chat_session') || '')

onMounted(async () => {
  const res = await flowApi.getPublishedFlows()
  publishedFlows.value = res.data?.data || []
})

async function sendMessage() {
  if (!selectedFlowId.value || !inputText.value.trim()) return
  
  const userMsg = { id: Date.now(), role: 'user', content: inputText.value }
  messages.value.push(userMsg)
  
  if (responseMode.value === 'streaming') {
    await sendStreaming()
  } else {
    await sendBlocking()
  }
  inputText.value = ''
}

async function sendStreaming() {
  streaming.value = true
  streamBuffer.value = ''
  sending.value = true
  
  const eventSource = new EventSource(
    `/api/flow/definitions/${selectedFlowId.value}/stream`,
    { withCredentials: true }
  )
  
  // POST 请求需要通过 fetch + ReadableStream 实现
  const response = await fetch(`/api/flow/definitions/${selectedFlowId.value}/stream`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      input: inputText.value,
      session_id: sessionId.value,
    }),
  })
  
  const reader = response.body?.getReader()
  const decoder = new TextDecoder()
  
  while (reader) {
    const { done, value } = await reader.read()
    if (done) break
    
    const chunk = decoder.decode(value)
    const lines = chunk.split('\n')
    
    for (const line of lines) {
      if (line.startsWith('data: ')) {
        const data = line.slice(6)
        if (data === '[DONE]') {
          streaming.value = false
          sending.value = false
          messages.value.push({
            id: Date.now(),
            role: 'assistant',
            content: streamBuffer.value,
          })
          return
        }
        try {
          const event = JSON.parse(data)
          if (event.event === 'text_chunk') {
            streamBuffer.value += event.data?.content || ''
          } else if (event.event === 'workflow_finished') {
            sessionId.value = event.data?.session_id || sessionId.value
            localStorage.setItem('flow_chat_session', sessionId.value)
          }
        } catch {}
      }
    }
  }
}

async function sendBlocking() {
  sending.value = true
  try {
    const res = await flowApi.executeFlow(selectedFlowId.value, {
      input: inputText.value,
      session_id: sessionId.value,
    })
    messages.value.push({
      id: Date.now(),
      role: 'assistant',
      content: res.data?.data?.output || '无输出',
      nodeResults: res.data?.data?.node_results,
    })
  } catch (e) {
    ElMessage.error('发送失败')
  } finally {
    sending.value = false
  }
}
</script>

3.3 API 封装

frontend/src/api/index.ts 新增:

// 流式执行用 fetch 而非 axios
executeFlowStream: (id: string, data: any) => {
  return fetch(`/api/flow/definitions/${id}/stream`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(data),
  })
},

getPublishedFlows: () => api.get('/flow/market'),

3.4 路由注册

frontend/src/router/index.ts 新增:

{
  path: '/chat/flow',
  name: 'FlowChat',
  component: () => import('@/views/chat/FlowChat.vue'),
  meta: { title: '流式对话', requiresAuth: true },
},

四、实施计划

阶段 任务 预计工时 优先级
1 自定义工具数据模型 + Schema 2h P0
2 OpenAPI 解析器 (parser.py) 4h P0
3 自定义工具执行器 (executor.py) 3h P0
4 自定义工具 CRUD 路由 3h P0
5 ToolNodeAgent 集成自定义工具 2h P0
6 前端 CustomToolManager 页面 4h P1
7 ToolConfig.vue 动态参数表单 3h P1
8 前端 FlowChat.vue EventSource 组件 5h P1
9 前端路由 + API 封装 2h P1
10 集成测试 4h P2

总计:约 32 工时(4 天)


五、验收标准

自定义 API 工具

  • 输入 https://petstore.swagger.io/v2/swagger.json 可成功导入所有 endpoint
  • 导入的工具出现在 ToolConfig.vue 下拉框中
  • 选择自定义工具后,参数表单根据 Schema 动态生成
  • Flow 执行时,自定义工具通过 httpx 正确调用并返回结果
  • 支持 API Key / Bearer Token 认证

前端 EventSource 聊天

  • 打开 /chat/flow 页面,选择已发布 Flow
  • 流式模式下,输入消息后实时看到文字逐字出现
  • 阻塞模式下,输入消息后等待完整结果一次性显示
  • 显示节点执行详情(折叠面板)
  • 刷新页面后 session_id 保留,支持多轮对话
  • 切换 Flow 后 session_id 重置