"""API 密钥认证中间件模块。 提供基于 API 密钥的流程访问认证功能。 主要用于外部系统通过 API Key 调用已发布的 AI 流程。 """ import hashlib from datetime import datetime from fastapi import Request, HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from models import FlowApiKey from database import get_db async def authenticate_api_key(request: Request) -> dict: """验证请求中的 API 密钥并返回关联的流程信息。 从 Authorization 请求头中提取 API Key,验证其有效性并更新最后使用时间。 API Key 必须以 "flow-" 开头,验证时对其 SHA-256 哈希值进行数据库匹配。 Args: request: 当前 HTTP 请求对象。 Returns: dict: 包含 flow_id、api_key_id 和 auth_type 的认证信息字典。 Raises: HTTPException: 当缺少认证信息、API Key 格式无效或密钥不存在时抛出 401 异常。 """ auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): raise HTTPException(401, "缺少认证信息") raw_key = auth_header[7:] # 提取 Bearer 后的密钥部分 if not raw_key.startswith("flow-"): raise HTTPException(401, "无效的API Key格式") key_hash = hashlib.sha256(raw_key.encode()).hexdigest() # 计算 SHA-256 哈希值 db_gen = get_db() db: AsyncSession = await db_gen.__anext__() try: result = await db.execute( select(FlowApiKey).where(FlowApiKey.key_hash == key_hash) ) api_key = result.scalar_one_or_none() if not api_key: raise HTTPException(401, "API Key无效或已删除") api_key.last_used_at = datetime.utcnow() # 更新最后使用时间 await db.flush() return { "flow_id": str(api_key.flow_id), # 关联的流程 ID "api_key_id": str(api_key.id), # API Key 记录 ID "auth_type": "api_key", # 认证类型标识 } finally: try: await db_gen.__anext__() except StopAsyncIteration: pass