"""流程定义管理路由。 提供流程的 CRUD、发布/下架、版本管理、执行、SSE 流式执行、 API Key 管理、执行历史查询以及市场模板等功能。 """ import uuid import time import json import asyncio import hashlib import secrets from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Request, Query from fastapi.responses import StreamingResponse from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from database import get_db from models import FlowDefinition, FlowVersion, FlowApiKey, FlowExecution, User, MemoryMessage, FlowTemplate from schemas import ( FlowDefinitionCreate, FlowDefinitionUpdate, FlowDefinitionOut, FlowVersionOut, FlowApiKeyCreate, FlowApiKeyOut, FlowExecuteRequest, FlowChatMessageRequest, ) from modules.flow_engine.engine import FlowEngine, ToolNodeAgent from agentscope.message import Msg from dependencies import get_current_user import logging logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/flow", tags=["flow"]) def _build_flow_out(f) -> FlowDefinitionOut: return FlowDefinitionOut( id=f.id, name=f.name, description=f.description, version=f.version, status=f.status, definition_json=f.definition_json, published_version_id=getattr(f, 'published_version_id', None), published_to_wecom=getattr(f, 'published_to_wecom', False), published_to_web=getattr(f, 'published_to_web', False), flow_mode=getattr(f, 'flow_mode', 'chatflow') or 'chatflow', created_at=f.created_at, updated_at=f.updated_at, ) # ============================== CRUD ============================== @router.get("/definitions", response_model=list[FlowDefinitionOut]) async def list_flows(request: Request, db: AsyncSession = Depends(get_db)): result = await db.execute( select(FlowDefinition).order_by(FlowDefinition.updated_at.desc()) ) return [_build_flow_out(f) for f in result.scalars().all()] @router.get("/definitions/{flow_id}", response_model=FlowDefinitionOut) async def get_flow(flow_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") return _build_flow_out(f) @router.post("/definitions", response_model=FlowDefinitionOut) async def create_flow(req: FlowDefinitionCreate, request: Request, db: AsyncSession = Depends(get_db)): user_ctx = request.state.user definition_json = { "nodes": [n.model_dump() for n in req.nodes], "edges": [e.model_dump() for e in req.edges], "trigger": req.trigger, } flow = FlowDefinition( name=req.name, description=req.description, definition_json=definition_json, draft_definition_json=definition_json, creator_id=uuid.UUID(user_ctx["id"]), flow_mode=req.flow_mode, ) db.add(flow) await db.flush() return _build_flow_out(flow) @router.put("/definitions/{flow_id}", response_model=FlowDefinitionOut) async def update_flow(flow_id: uuid.UUID, req: FlowDefinitionUpdate, request: Request, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") if req.name is not None: f.name = req.name if req.description is not None: f.description = req.description if req.nodes is not None and req.edges is not None: new_def = { "nodes": [n.model_dump() for n in req.nodes], "edges": [e.model_dump() for e in req.edges], "trigger": req.trigger or f.definition_json.get("trigger", {}), } f.version += 1 f.draft_definition_json = new_def f.definition_json = new_def return _build_flow_out(f) @router.delete("/definitions/{flow_id}") async def delete_flow(flow_id: uuid.UUID, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") await db.delete(f) return {"code": 200, "message": "已删除"} # ============================== 发布 (创建快照) ============================== async def _snapshot_publish(flow: FlowDefinition, db: AsyncSession, user_id: str, publish_wecom: bool = False, publish_web: bool = False, changelog: str = ""): new_version = FlowVersion( flow_id=flow.id, version=flow.version, definition_json=json.loads(json.dumps(flow.definition_json)), changelog=changelog, published_by=uuid.UUID(user_id), published_to_wecom=publish_wecom, published_to_web=publish_web, ) db.add(new_version) await db.flush() flow.published_version_id = new_version.id flow.status = "published" if publish_wecom: flow.published_to_wecom = True if publish_web: flow.published_to_web = True return new_version def validate_flow_definition(definition: dict, flow_mode: str = "chatflow") -> list[str]: errors = [] nodes = definition.get("nodes", []) edges = definition.get("edges", []) if not nodes: errors.append("流定义中没有节点") return errors if not any(n.get("type") == "trigger" for n in nodes): errors.append("缺少触发/起始节点 (trigger)") if flow_mode == "chatflow" and not any(n.get("type") == "llm" for n in nodes): errors.append("对话型流必须包含至少一个 LLM 节点") node_ids = {n["id"] for n in nodes} connected_ids = set() for e in edges: connected_ids.add(e.get("source", "")) connected_ids.add(e.get("target", "")) for n in nodes: if n["id"] not in connected_ids and len(nodes) > 1: errors.append(f"节点 '{n.get('label', n['id'])}' 未连接") return errors @router.post("/definitions/{flow_id}/validate") async def validate_flow(flow_id: uuid.UUID, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") flow_mode = getattr(f, 'flow_mode', 'chatflow') or 'chatflow' errors = validate_flow_definition(f.definition_json, flow_mode) return {"code": 200, "valid": len(errors) == 0, "errors": errors} @router.post("/definitions/{flow_id}/publish") async def publish_flow(flow_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") nodes = f.definition_json.get("nodes", []) if not nodes: raise HTTPException(400, "流定义中没有节点") flow_mode = getattr(f, 'flow_mode', 'chatflow') or 'chatflow' errors = validate_flow_definition(f.definition_json, flow_mode) if errors: raise HTTPException(400, f"流校验失败: {'; '.join(errors)}") user_ctx = request.state.user body = {} try: body = await request.json() except Exception: pass changelog = body.get("changelog", "") await _snapshot_publish(f, db, user_ctx["id"], publish_wecom=True, changelog=changelog) return {"code": 200, "message": "流已上架到企微", "data": {"status": "published", "version": f.version}} @router.post("/definitions/{flow_id}/publish-web") async def publish_flow_to_web(flow_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") user_ctx = request.state.user prev_version_id = f.published_version_id await _snapshot_publish(f, db, user_ctx["id"], publish_web=True) return {"code": 200, "message": "流已上架到网页", "data": {"status": "published", "version": f.version}} @router.post("/definitions/{flow_id}/unpublish") async def unpublish_flow(flow_id: uuid.UUID, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") f.status = "draft" f.published_to_wecom = False f.published_version_id = None return {"code": 200, "message": "流已下架"} @router.post("/definitions/{flow_id}/unpublish-web") async def unpublish_flow_from_web(flow_id: uuid.UUID, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") f.published_to_web = False if not f.published_to_wecom: f.status = "draft" f.published_version_id = None return {"code": 200, "message": "流已从网页下架"} # ============================== 版本管理 ============================== @router.get("/definitions/{flow_id}/versions", response_model=list[FlowVersionOut]) async def list_versions(flow_id: uuid.UUID, db: AsyncSession = Depends(get_db)): result = await db.execute( select(FlowVersion) .where(FlowVersion.flow_id == flow_id) .order_by(FlowVersion.version.desc()) .limit(50) ) return [FlowVersionOut( id=v.id, flow_id=v.flow_id, version=v.version, definition_json=v.definition_json, changelog=v.changelog or "", published_to_wecom=v.published_to_wecom, published_to_web=v.published_to_web, published_by=v.published_by, created_at=v.created_at, ) for v in result.scalars().all()] @router.post("/definitions/{flow_id}/rollback/{version_id}") async def rollback_flow(flow_id: uuid.UUID, version_id: uuid.UUID, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") target = await db.get(FlowVersion, version_id) if not target or str(target.flow_id) != str(flow_id): raise HTTPException(404, "版本不存在") f.definition_json = json.loads(json.dumps(target.definition_json)) f.draft_definition_json = f.definition_json f.published_version_id = target.id f.published_to_wecom = target.published_to_wecom f.published_to_web = target.published_to_web f.status = "published" if (target.published_to_wecom or target.published_to_web) else "draft" f.version = target.version return {"code": 200, "message": f"已回滚到版本 v{target.version}", "data": {"version": target.version}} # ============================== 执行 (加版本快照) ============================== def _get_definition_json(flow: FlowDefinition, db_session) -> dict: """优先加载 published_version 快照,不存在则使用当前 definition_json""" return flow.definition_json async def _get_published_definition(flow: FlowDefinition, db: AsyncSession) -> dict: published_version_id = getattr(flow, 'published_version_id', None) if published_version_id: result = await db.execute(select(FlowVersion).where(FlowVersion.id == published_version_id)) published = result.scalar_one_or_none() if published: return json.loads(json.dumps(published.definition_json)) return flow.definition_json @router.post("/definitions/{flow_id}/execute") async def execute_flow(flow_id: uuid.UUID, request: Request, payload: dict, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") user_ctx = request.state.user input_text = payload.get("input", payload.get("message", "")) session_id = payload.get("session_id") or str(uuid.uuid4()) definition = await _get_published_definition(f, db) await ToolNodeAgent.load_custom_tools(db) engine = FlowEngine(definition) input_msg = Msg(name="user", content=input_text, role="user") context = { "user_id": user_ctx["id"], "username": user_ctx.get("username", ""), "session_id": session_id, "trigger_data": payload.get("trigger", {}), "_node_results": {}, } try: from modules.memory.manager import get_memory_manager mm = get_memory_manager() flow_mode = getattr(f, 'flow_mode', 'chatflow') or 'chatflow' if flow_mode == 'chatflow': await mm.inject_memory( user_id=user_ctx["id"], flow_id=str(flow_id), session_id=session_id, context=context, ) except Exception as e: logger.debug(f"记忆注入跳过: {e}") start_time = time.time() try: result_msg = await engine.execute(input_msg, context) elapsed_ms = int((time.time() - start_time) * 1000) output_text = result_msg.get_text_content() if hasattr(result_msg, 'get_text_content') else str(result_msg) try: from modules.memory.manager import get_memory_manager mm = get_memory_manager() if flow_mode == 'chatflow': asyncio.create_task(mm.record_exchange( user_id=user_ctx["id"], flow_id=str(flow_id), session_id=session_id, user_msg=input_text, assistant_msg=output_text, flow_name=f.name, )) except Exception as e: logger.debug(f"记忆记录跳过: {e}") execution = FlowExecution( flow_id=f.id, version=_get_published_version_number(f), trigger_type=payload.get("trigger_type", "manual"), trigger_user_id=uuid.UUID(user_ctx["id"]), input_data={"input": input_text}, output_data={"output": output_text}, status="completed", latency_ms=elapsed_ms, finished_at=datetime.utcnow(), ) db.add(execution) return { "code": 200, "data": { "output": output_text, "session_id": session_id, "node_results": context.get("_node_results", {}), "execution_id": str(execution.id), "latency_ms": elapsed_ms, }, } except Exception as e: elapsed_ms = int((time.time() - start_time) * 1000) execution = FlowExecution( flow_id=f.id, version=_get_published_version_number(f), trigger_type="manual", trigger_user_id=uuid.UUID(user_ctx["id"]), input_data={"input": input_text}, status="failed", latency_ms=elapsed_ms, error_message=str(e)[:2000], finished_at=datetime.utcnow(), ) db.add(execution) raise HTTPException(500, f"流执行失败: {str(e)}") def _get_published_version_number(flow: FlowDefinition) -> int | None: return flow.version @router.post("/definitions/{flow_id}/test") async def test_flow(flow_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") definition = await _get_published_definition(f, db) nodes = definition.get("nodes", []) edges = definition.get("edges", []) validation = {"valid": True, "node_count": len(nodes), "edge_count": len(edges), "node_types": list(set(n.get("type", "unknown") for n in nodes)), "issues": []} node_ids = {n["id"] for n in nodes} for edge in edges: s = edge.get("source") or edge.get("from") t = edge.get("target") or edge.get("to") if s and s not in node_ids: validation["issues"].append(f"边源节点 {s} 不存在") if t and t not in node_ids: validation["issues"].append(f"边目标节点 {t} 不存在") if validation["issues"]: validation["valid"] = False if not any(n.get("type") == "trigger" for n in nodes): validation["issues"].append("流缺少触发节点") return {"code": 200, "data": validation} # ============================== SSE 流式执行 ============================== @router.post("/definitions/{flow_id}/stream") async def execute_flow_stream(flow_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): body = await request.json() input_text = body.get("input", body.get("message", "")) session_id = body.get("session_id") or str(uuid.uuid4()) user_ctx = request.state.user f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") definition = await _get_published_definition(f, db) await ToolNodeAgent.load_custom_tools(db) try: from modules.memory.manager import get_memory_manager mm_stream = get_memory_manager() except Exception: mm_stream = None flow_mode_stream = getattr(f, 'flow_mode', 'chatflow') or 'chatflow' async def event_generator(): engine = FlowEngine(definition) context = { "user_id": user_ctx["id"], "username": user_ctx.get("username", ""), "session_id": session_id, "trigger_data": body.get("trigger", {}), "_node_results": {}, "_stream_callback": None, } if mm_stream and flow_mode_stream == 'chatflow': try: await mm_stream.inject_memory( user_id=user_ctx["id"], flow_id=str(flow_id), session_id=session_id, context=context, ) except Exception: pass input_msg = Msg(name="user", content=input_text, role="user") start_time = time.time() token_queue: asyncio.Queue = asyncio.Queue() async def stream_callback(event_type: str, data: dict): await token_queue.put((event_type, data)) context["_stream_callback"] = stream_callback yield f"data: {json.dumps({'event': 'workflow_started', 'data': {'flow_id': str(flow_id)}}, ensure_ascii=False)}\n\n" execution_task = asyncio.create_task( asyncio.wait_for( engine.execute(input_msg, context), timeout=engine.FLOW_TIMEOUT_SECONDS, ) ) result_msg = None exec_error = None while True: done = asyncio.ensure_future(token_queue.get()) try: wait_done, _ = await asyncio.wait( [asyncio.ensure_future(execution_task), done], return_when=asyncio.FIRST_COMPLETED, ) except Exception as e: exec_error = e break if execution_task.done(): try: result_msg = execution_task.result() except asyncio.TimeoutError: exec_error = asyncio.TimeoutError("执行超时") except Exception as e: exec_error = e while not token_queue.empty(): ev_type, ev_data = token_queue.get_nowait() chunk_data = json.dumps({"event": ev_type, "data": ev_data}, ensure_ascii=False) yield f"data: {chunk_data}\n\n" break ev_type, ev_data = done.result() chunk_data = json.dumps({"event": ev_type, "data": ev_data}, ensure_ascii=False) yield f"data: {chunk_data}\n\n" elapsed_ms = int((time.time() - start_time) * 1000) if exec_error: yield f"data: {json.dumps({'event': 'error', 'data': {'message': str(exec_error)}}, ensure_ascii=False)}\n\n" execution = FlowExecution( flow_id=f.id, version=_get_published_version_number(f), trigger_type="manual", trigger_user_id=uuid.UUID(user_ctx["id"]), input_data={"input": input_text}, status="failed", latency_ms=elapsed_ms, error_message=str(exec_error)[:2000] if not isinstance(exec_error, asyncio.TimeoutError) else "执行超时", finished_at=datetime.utcnow(), ) db.add(execution) return output_text = result_msg.get_text_content() if hasattr(result_msg, 'get_text_content') else str(result_msg) yield f"data: {json.dumps({'event': 'workflow_finished', 'data': {'output': output_text, 'session_id': session_id, 'node_results': {k: str(v)[:200] for k, v in context.get('_node_results', {}).items()}, 'latency_ms': elapsed_ms}}, ensure_ascii=False)}\n\n" if mm_stream and flow_mode_stream == 'chatflow': try: asyncio.create_task(mm_stream.record_exchange( user_id=user_ctx["id"], flow_id=str(flow_id), session_id=session_id, user_msg=input_text, assistant_msg=output_text, flow_name=f.name, )) except Exception: pass execution = FlowExecution( flow_id=f.id, version=_get_published_version_number(f), trigger_type=body.get("trigger_type", "manual"), trigger_user_id=uuid.UUID(user_ctx["id"]), input_data={"input": input_text}, output_data={"output": output_text}, status="completed", latency_ms=elapsed_ms, finished_at=datetime.utcnow(), ) db.add(execution) yield "data: [DONE]\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) # ============================== API Key 管理 ============================== def _generate_api_key() -> tuple[str, str, str]: raw = "flow-" + secrets.token_urlsafe(24) key_hash = hashlib.sha256(raw.encode()).hexdigest() return raw, key_hash, raw[:14] @router.post("/definitions/{flow_id}/api-keys", response_model=dict) async def create_api_key(flow_id: uuid.UUID, body: FlowApiKeyCreate, request: Request, db: AsyncSession = Depends(get_db)): f = await db.get(FlowDefinition, flow_id) if not f: raise HTTPException(404, "流定义不存在") user_ctx = request.state.user raw, key_hash, key_prefix = _generate_api_key() api_key = FlowApiKey( flow_id=flow_id, name=body.name, key_hash=key_hash, key_prefix=key_prefix, created_by=uuid.UUID(user_ctx["id"]), ) db.add(api_key) await db.flush() return { "code": 200, "data": { "id": str(api_key.id), "name": body.name, "key_prefix": key_prefix, "api_key": raw, "created_at": str(api_key.created_at), }, } @router.get("/definitions/{flow_id}/api-keys", response_model=list[FlowApiKeyOut]) async def list_api_keys(flow_id: uuid.UUID, db: AsyncSession = Depends(get_db)): result = await db.execute( select(FlowApiKey).where(FlowApiKey.flow_id == flow_id).order_by(FlowApiKey.created_at.desc()) ) return [FlowApiKeyOut( id=k.id, flow_id=k.flow_id, name=k.name, key_prefix=k.key_prefix, last_used_at=k.last_used_at, created_at=k.created_at, ) for k in result.scalars().all()] @router.delete("/api-keys/{key_id}") async def delete_api_key(key_id: uuid.UUID, db: AsyncSession = Depends(get_db)): k = await db.get(FlowApiKey, key_id) if not k: raise HTTPException(404, "API Key不存在") await db.delete(k) return {"code": 200, "message": "API Key已删除"} # ============================== 执行历史 ============================== @router.get("/executions") async def list_executions( db: AsyncSession = Depends(get_db), flow_id: str | None = Query(None), page: int = Query(1), page_size: int = Query(20), ): query = select(FlowExecution).order_by(FlowExecution.started_at.desc()) if flow_id: query = query.where(FlowExecution.flow_id == uuid.UUID(flow_id)) total_result = await db.execute(query) total = len(total_result.scalars().all()) result = await db.execute(query.offset((page - 1) * page_size).limit(page_size)) executions = result.scalars().all() return { "code": 200, "data": [{ "id": str(e.id), "flow_id": str(e.flow_id), "version": e.version, "trigger_type": e.trigger_type, "status": e.status, "latency_ms": e.latency_ms, "token_usage": e.token_usage, "error_message": e.error_message, "started_at": str(e.started_at), "finished_at": str(e.finished_at) if e.finished_at else None, } for e in executions], "total": total, "page": page, "page_size": page_size, } # ============================== 模板 ============================== @router.get("/market", response_model=list[FlowDefinitionOut]) async def flow_market(db: AsyncSession = Depends(get_db)): result = await db.execute( select(FlowDefinition).where(FlowDefinition.status == "published").order_by(FlowDefinition.updated_at.desc()) ) return [_build_flow_out(f) for f in result.scalars().all()] @router.get("/templates") async def get_flow_templates(db: AsyncSession = Depends(get_db)): result = await db.execute( select(FlowTemplate).order_by(FlowTemplate.sort_order.asc(), FlowTemplate.created_at.desc()) ) templates = result.scalars().all() return {"code": 200, "data": [ { "id": str(t.id), "name": t.name, "description": t.description, "category": t.category, "definition_json": t.definition_json, "icon": t.icon, "is_builtin": t.is_builtin, "usage_count": t.usage_count, } for t in templates ]} @router.post("/templates") async def create_flow_template(request: Request, db: AsyncSession = Depends(get_db)): body = await request.json() user_ctx = request.state.user ft = FlowTemplate( name=body.get("name", ""), description=body.get("description", ""), category=body.get("category", "general"), definition_json=body.get("definition_json", {}), icon=body.get("icon", ""), sort_order=body.get("sort_order", 0), created_by=uuid.UUID(user_ctx["id"]), ) db.add(ft) await db.commit() await db.refresh(ft) return {"code": 200, "data": {"id": str(ft.id), "name": ft.name}} @router.put("/templates/{template_id}") async def update_flow_template(template_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): ft = await db.get(FlowTemplate, template_id) if not ft: raise HTTPException(404, "模板不存在") body = await request.json() for field in ("name", "description", "category", "definition_json", "icon", "sort_order"): if field in body: setattr(ft, field, body[field]) await db.commit() return {"code": 200, "data": {"id": str(ft.id)}} @router.delete("/templates/{template_id}") async def delete_flow_template(template_id: uuid.UUID, db: AsyncSession = Depends(get_db)): ft = await db.get(FlowTemplate, template_id) if not ft: raise HTTPException(404, "模板不存在") if ft.is_builtin: raise HTTPException(400, "内置模板不可删除") await db.delete(ft) await db.commit() return {"code": 200, "message": "模板已删除"} @router.post("/templates/{template_id}/use") async def use_flow_template(template_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)): ft = await db.get(FlowTemplate, template_id) if not ft: raise HTTPException(404, "模板不存在") ft.usage_count = (ft.usage_count or 0) + 1 user_ctx = request.state.user flow = FlowDefinition( name=ft.name + " (副本)", description=ft.description or "", definition_json=ft.definition_json, draft_definition_json=ft.definition_json, creator_id=uuid.UUID(user_ctx["id"]), ) db.add(flow) await db.flush() await db.commit() return _build_flow_out(flow)