You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
18 KiB
18 KiB
PLAN7 — 自定义 API 工具导入 + 前端 EventSource 流式聊天组件
一、现状与差距
PLAN6 完成后,系统已具备:
- ✅ 版本快照(FlowVersion)
- ✅ SSE 流式输出(后端)
- ✅ 统一 API 网关(/v1/chat-messages, /v1/workflows/run)
- ✅ API Key 认证
- ✅ 工具 Schema 标准化(内置工具)
- ✅ Flow 节点 Memory
- ✅ ParallelMergeNodeAgent + Loop 数组迭代
剩余 15% 差距:
- 自定义 API 工具导入:用户无法在前端填入第三方 OpenAPI/Swagger URL,系统自动解析为工具 Schema 并注册到 ToolNodeAgent
- 前端 EventSource 聊天组件:前端没有支持 SSE 的聊天界面,无法实时看到流式输出
二、功能 1:自定义 API 工具导入(OpenAPI/Swagger 解析)
2.1 目标
用户在前端输入第三方 API 的 OpenAPI/Swagger URL(如 https://api.example.com/openapi.json),后端自动:
- 下载并解析 OpenAPI 文档
- 提取每个 endpoint 的 method、path、parameters、description
- 转换为 OpenAI Function Calling Schema 格式
- 注册为 Flow 可用的自定义工具
- 执行时通过 httpx 动态调用
2.2 数据模型
# models/__init__.py 新增
class CustomTool(Base):
__tablename__ = "custom_tools"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(100), nullable=False) # 工具名称
description = Column(Text) # 工具描述
schema_json = Column(JSON, nullable=False) # OpenAI Function Calling Schema
endpoint_url = Column(String(500), nullable=False) # 基础 URL
method = Column(String(10), default="GET") # HTTP 方法
path = Column(String(500)) # API 路径
headers_json = Column(JSON, default=dict) # 固定请求头
auth_type = Column(String(20), default="none") # none/api_key/oauth
auth_config = Column(JSON, default=dict) # 认证配置
created_by = Column(UUID(as_uuid=True), ForeignKey("users.id"))
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
2.3 Schema
# schemas/__init__.py 新增
class CustomToolCreate(BaseModel):
name: str
description: str | None = None
openapi_url: str | None = None # 二选一:URL 或手动配置
endpoint_url: str | None = None
method: str = "GET"
path: str = ""
headers: dict = {}
auth_type: str = "none"
auth_config: dict = {}
schema_json: dict | None = None # 手动传入 Schema
class CustomToolOut(BaseModel):
id: uuid.UUID
name: str
description: str | None = None
schema_json: dict
endpoint_url: str
method: str
path: str
auth_type: str
is_active: bool
created_at: datetime | None = None
class OpenAPIImportRequest(BaseModel):
openapi_url: str
base_url_override: str | None = None # 可选覆盖 base_url
2.4 后端实现
模块:backend/modules/custom_tool/
parser.py — OpenAPI 解析器
import json
import httpx
from typing import Any
class OpenAPIParser:
def __init__(self, spec: dict):
self.spec = spec
self.base_url = spec.get("servers", [{}])[0].get("url", "")
def parse_tools(self) -> list[dict]:
tools = []
paths = self.spec.get("paths", {})
for path, methods in paths.items():
for method, operation in methods.items():
if method in ("get", "post", "put", "delete", "patch"):
tool = self._parse_endpoint(path, method, operation)
if tool:
tools.append(tool)
return tools
def _parse_endpoint(self, path: str, method: str, operation: dict) -> dict | None:
name = operation.get("operationId", f"{method}_{path.replace('/', '_').strip('_')}")
description = operation.get("summary", operation.get("description", f"{method.upper()} {path}"))
parameters = self._parse_parameters(operation)
return {
"name": name,
"description": description,
"parameters": {
"type": "object",
"properties": parameters,
"required": [p["name"] for p in operation.get("parameters", []) if p.get("required")],
},
"path": path,
"method": method.upper(),
}
def _parse_parameters(self, operation: dict) -> dict[str, Any]:
props = {}
for param in operation.get("parameters", []):
schema = param.get("schema", {})
props[param["name"]] = {
"type": schema.get("type", "string"),
"description": param.get("description", ""),
}
if "enum" in schema:
props[param["name"]]["enum"] = schema["enum"]
# requestBody
body = operation.get("requestBody", {}).get("content", {}).get("application/json", {}).get("schema", {})
if body:
for name, prop in body.get("properties", {}).items():
props[name] = {"type": prop.get("type", "string"), "description": prop.get("description", "")}
return props
executor.py — 动态执行器
import httpx
import json
from typing import Any
class CustomToolExecutor:
def __init__(self, tool: dict):
self.tool = tool
self.endpoint_url = tool["endpoint_url"]
self.method = tool["method"]
self.path = tool["path"]
self.headers = tool.get("headers_json", {})
self.auth_type = tool.get("auth_type", "none")
self.auth_config = tool.get("auth_config", {})
async def execute(self, params: dict) -> str:
url = f"{self.endpoint_url.rstrip('/')}/{self.path.lstrip('/')}"
headers = dict(self.headers)
if self.auth_type == "api_key":
key = self.auth_config.get("key", "")
loc = self.auth_config.get("location", "header") # header / query
name = self.auth_config.get("name", "X-API-Key")
if loc == "header":
headers[name] = key
else:
params[name] = key
elif self.auth_type == "bearer":
headers["Authorization"] = f"Bearer {self.auth_config.get('token', '')}"
async with httpx.AsyncClient(timeout=30) as client:
if self.method == "GET":
resp = await client.get(url, params=params, headers=headers)
else:
resp = await client.request(self.method, url, json=params, headers=headers)
try:
data = resp.json()
return json.dumps(data, ensure_ascii=False, indent=2)[:2000]
except:
return resp.text[:2000]
router.py — CRUD + 导入端点
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import CustomTool
from schemas import CustomToolCreate, CustomToolOut, OpenAPIImportRequest
from .parser import OpenAPIParser
from .executor import CustomToolExecutor
import httpx
router = APIRouter(prefix="/api/custom-tools", tags=["custom_tools"])
@router.post("/import-openapi")
async def import_openapi(req: OpenAPIImportRequest, db: AsyncSession = Depends(get_db)):
async with httpx.AsyncClient() as client:
resp = await client.get(req.openapi_url, timeout=30)
spec = resp.json()
parser = OpenAPIParser(spec)
tools = parser.parse_tools()
base_url = req.base_url_override or parser.base_url
created = []
for t in tools:
tool = CustomTool(
name=t["name"],
description=t["description"],
schema_json=t["parameters"],
endpoint_url=base_url,
method=t["method"],
path=t["path"],
)
db.add(tool)
created.append(t["name"])
await db.flush()
return {"code": 200, "message": f"成功导入 {len(created)} 个工具", "data": {"tools": created}}
@router.post("/", response_model=CustomToolOut)
async def create_custom_tool(req: CustomToolCreate, db: AsyncSession = Depends(get_db)):
tool = CustomTool(
name=req.name,
description=req.description,
schema_json=req.schema_json or {},
endpoint_url=req.endpoint_url or "",
method=req.method,
path=req.path,
headers_json=req.headers,
auth_type=req.auth_type,
auth_config=req.auth_config,
)
db.add(tool)
await db.flush()
return tool
@router.get("/", response_model=list[CustomToolOut])
async def list_custom_tools(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(CustomTool).where(CustomTool.is_active == True))
return result.scalars().all()
@router.post("/{tool_id}/test")
async def test_custom_tool(tool_id: uuid.UUID, params: dict, db: AsyncSession = Depends(get_db)):
tool = await db.get(CustomTool, tool_id)
if not tool:
raise HTTPException(404, "工具不存在")
executor = CustomToolExecutor({
"endpoint_url": tool.endpoint_url,
"method": tool.method,
"path": tool.path,
"headers_json": tool.headers_json,
"auth_type": tool.auth_type,
"auth_config": tool.auth_config,
})
result = await executor.execute(params)
return {"code": 200, "data": {"result": result}}
2.5 ToolNodeAgent 集成
修改 engine.py 中 _init_registry,加载 CustomTool:
@classmethod
def _init_registry(cls):
if cls._TOOL_REGISTRY:
return
# ... 原有内置工具注册 ...
# 加载自定义工具
try:
from sqlalchemy import select
from database import SessionLocal
from models import CustomTool
# 注:这里需要用 sync session 或改为异步初始化
except ImportError:
pass
更优方案:在 FlowEngine 初始化时异步加载自定义工具:
async def _load_custom_tools(self, db: AsyncSession):
from models import CustomTool
result = await db.execute(select(CustomTool).where(CustomTool.is_active == True))
for tool in result.scalars().all():
ToolNodeAgent._TOOL_REGISTRY[tool.name] = lambda params, t=tool: CustomToolExecutor(t).execute(params)
ToolNodeAgent._TOOL_SCHEMAS[tool.name] = tool.schema_json
2.6 前端页面
frontend/src/views/tools/CustomToolManager.vue
- 表格:列出所有自定义工具(名称、方法、路径、认证方式)
- 导入按钮:弹出对话框,输入 OpenAPI URL → 点击导入
- 测试按钮:填入参数 → 调用测试端点 → 显示结果
- 手动创建:表单填写 name/endpoint/method/path/schema
frontend/src/views/flow/node-configs/ToolConfig.vue 增强:
- 工具选择下拉框增加"自定义工具"分组
- 选择自定义工具后,根据 schema_json 动态生成参数表单
三、功能 2:前端 EventSource 流式聊天组件
3.1 目标
创建一个独立的聊天页面/组件,支持:
- 通过 EventSource 连接后端 SSE 端点
- 实时显示
workflow_started→node_started→text_chunk→workflow_finished事件 - 支持选择已发布的 Flow
- 显示节点执行进度和中间结果
- 支持多轮对话(session_id 持久化)
3.2 组件设计
frontend/src/views/chat/FlowChat.vue
<template>
<div class="flow-chat-page">
<el-page-header @back="$router.back()" content="流式对话" />
<el-card style="margin-top: 20px">
<el-form inline>
<el-form-item label="选择流">
<el-select v-model="selectedFlowId" placeholder="选择已发布的流">
<el-option v-for="f in publishedFlows" :key="f.id" :label="f.name" :value="f.id" />
</el-select>
</el-form-item>
<el-form-item label="模式">
<el-radio-group v-model="responseMode">
<el-radio-button label="blocking">阻塞模式</el-radio-button>
<el-radio-button label="streaming">流式模式</el-radio-button>
</el-radio-group>
</el-form-item>
</el-form>
</el-card>
<el-card style="margin-top: 20px; min-height: 400px">
<div ref="chatContainer" class="chat-container">
<div v-for="msg in messages" :key="msg.id" :class="['message', msg.role]">
<div class="message-content" v-html="renderMarkdown(msg.content)" />
<div v-if="msg.nodeResults" class="node-results">
<el-collapse>
<el-collapse-item title="节点执行详情">
<pre>{{ JSON.stringify(msg.nodeResults, null, 2) }}</pre>
</el-collapse-item>
</el-collapse>
</div>
</div>
<div v-if="streaming" class="message assistant streaming">
<div class="message-content">{{ streamBuffer }}</div>
</div>
</div>
</el-card>
<el-card style="margin-top: 20px">
<el-input
v-model="inputText"
type="textarea"
:rows="3"
placeholder="输入消息..."
@keydown.enter.prevent="sendMessage"
/>
<el-button type="primary" @click="sendMessage" :loading="sending">
发送
</el-button>
</el-card>
</div>
</template>
<script setup lang="ts">
import { ref, onMounted } from 'vue'
import { ElMessage } from 'element-plus'
import { flowApi } from '@/api'
const selectedFlowId = ref('')
const responseMode = ref('streaming')
const inputText = ref('')
const messages = ref<any[]>([])
const streaming = ref(false)
const streamBuffer = ref('')
const sending = ref(false)
const publishedFlows = ref<any[]>([])
const sessionId = ref(localStorage.getItem('flow_chat_session') || '')
onMounted(async () => {
const res = await flowApi.getPublishedFlows()
publishedFlows.value = res.data?.data || []
})
async function sendMessage() {
if (!selectedFlowId.value || !inputText.value.trim()) return
const userMsg = { id: Date.now(), role: 'user', content: inputText.value }
messages.value.push(userMsg)
if (responseMode.value === 'streaming') {
await sendStreaming()
} else {
await sendBlocking()
}
inputText.value = ''
}
async function sendStreaming() {
streaming.value = true
streamBuffer.value = ''
sending.value = true
const eventSource = new EventSource(
`/api/flow/definitions/${selectedFlowId.value}/stream`,
{ withCredentials: true }
)
// POST 请求需要通过 fetch + ReadableStream 实现
const response = await fetch(`/api/flow/definitions/${selectedFlowId.value}/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
input: inputText.value,
session_id: sessionId.value,
}),
})
const reader = response.body?.getReader()
const decoder = new TextDecoder()
while (reader) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value)
const lines = chunk.split('\n')
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6)
if (data === '[DONE]') {
streaming.value = false
sending.value = false
messages.value.push({
id: Date.now(),
role: 'assistant',
content: streamBuffer.value,
})
return
}
try {
const event = JSON.parse(data)
if (event.event === 'text_chunk') {
streamBuffer.value += event.data?.content || ''
} else if (event.event === 'workflow_finished') {
sessionId.value = event.data?.session_id || sessionId.value
localStorage.setItem('flow_chat_session', sessionId.value)
}
} catch {}
}
}
}
}
async function sendBlocking() {
sending.value = true
try {
const res = await flowApi.executeFlow(selectedFlowId.value, {
input: inputText.value,
session_id: sessionId.value,
})
messages.value.push({
id: Date.now(),
role: 'assistant',
content: res.data?.data?.output || '无输出',
nodeResults: res.data?.data?.node_results,
})
} catch (e) {
ElMessage.error('发送失败')
} finally {
sending.value = false
}
}
</script>
3.3 API 封装
frontend/src/api/index.ts 新增:
// 流式执行用 fetch 而非 axios
executeFlowStream: (id: string, data: any) => {
return fetch(`/api/flow/definitions/${id}/stream`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data),
})
},
getPublishedFlows: () => api.get('/flow/market'),
3.4 路由注册
frontend/src/router/index.ts 新增:
{
path: '/chat/flow',
name: 'FlowChat',
component: () => import('@/views/chat/FlowChat.vue'),
meta: { title: '流式对话', requiresAuth: true },
},
四、实施计划
| 阶段 | 任务 | 预计工时 | 优先级 |
|---|---|---|---|
| 1 | 自定义工具数据模型 + Schema | 2h | P0 |
| 2 | OpenAPI 解析器 (parser.py) | 4h | P0 |
| 3 | 自定义工具执行器 (executor.py) | 3h | P0 |
| 4 | 自定义工具 CRUD 路由 | 3h | P0 |
| 5 | ToolNodeAgent 集成自定义工具 | 2h | P0 |
| 6 | 前端 CustomToolManager 页面 | 4h | P1 |
| 7 | ToolConfig.vue 动态参数表单 | 3h | P1 |
| 8 | 前端 FlowChat.vue EventSource 组件 | 5h | P1 |
| 9 | 前端路由 + API 封装 | 2h | P1 |
| 10 | 集成测试 | 4h | P2 |
总计:约 32 工时(4 天)
五、验收标准
自定义 API 工具
- 输入
https://petstore.swagger.io/v2/swagger.json可成功导入所有 endpoint - 导入的工具出现在 ToolConfig.vue 下拉框中
- 选择自定义工具后,参数表单根据 Schema 动态生成
- Flow 执行时,自定义工具通过 httpx 正确调用并返回结果
- 支持 API Key / Bearer Token 认证
前端 EventSource 聊天
- 打开
/chat/flow页面,选择已发布 Flow - 流式模式下,输入消息后实时看到文字逐字出现
- 阻塞模式下,输入消息后等待完整结果一次性显示
- 显示节点执行详情(折叠面板)
- 刷新页面后 session_id 保留,支持多轮对话
- 切换 Flow 后 session_id 重置