"""RBAC 权限中间件模块。 提供全局 HTTP 请求的 RBAC(基于角色的访问控制)权限校验中间件。 每个请求都会经过此中间件,解析 JWT 令牌并查询用户的角色和权限信息, 将用户上下文存储到 request.state.user 中供后续路由使用。 """ import jwt from fastapi import Request, HTTPException from fastapi.responses import JSONResponse from config import settings from database import AsyncSessionLocal from models import User, UserRole, Role, RolePermission, Permission from sqlalchemy import select async def rbac_middleware(request: Request, call_next): """RBAC 权限校验中间件。 对每个 HTTP 请求进行权限校验: 1. 跳过公开路径(登录、健康检查等) 2. 解析 JWT 令牌获取用户身份 3. 从数据库查询用户的角色、权限和数据权限范围 4. 将用户上下文存储到 request.state.user 中 Args: request: 当前 HTTP 请求对象。 call_next: 下一个中间件或路由处理函数。 Returns: Response: 如果权限校验通过则返回后续处理结果,否则返回 401 错误响应。 """ # 公开路径列表,无需认证即可访问 public_paths = ["/api/auth/login", "/api/auth/wecom", "/health", "/docs", "/openapi.json", "/wecom/callback"] if any(request.url.path.startswith(p) for p in public_paths): return await call_next(request) # 从 Authorization 头中提取 JWT 令牌 token = request.headers.get("Authorization", "").replace("Bearer ", "") if not token: return JSONResponse({"code": 401, "message": "未提供认证令牌"}, 401) # 解析 JWT 令牌获取用户 ID try: payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM]) user_id = payload.get("sub") except jwt.PyJWTError: return JSONResponse({"code": 401, "message": "令牌无效或已过期"}, 401) # 从数据库查询用户信息和权限 async with AsyncSessionLocal() as db: result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user or user.status != "active": return JSONResponse({"code": 401, "message": "用户不存在或已禁用"}, 401) # 查询用户关联的所有角色 ur_result = await db.execute( select(Role).join(UserRole).where(UserRole.user_id == user.id) ) roles = ur_result.scalars().all() # 按类型分离:平台角色(管理后台)vs 岗位(企业AI) platform_roles = [] position_roles = [] is_root = False # 收集所有权限编码和数据权限范围 permissions = [] data_scopes = [] for role in roles: if role.role_type == "platform": platform_roles.append({"code": role.code, "name": role.name, "data_scope": role.data_scope}) else: position_roles.append({"code": role.code, "name": role.name, "data_scope": role.data_scope}) if role.code == "root": is_root = True data_scopes.append(role.data_scope) rp_result = await db.execute( select(Permission).join(RolePermission).where(RolePermission.role_id == role.id) ) perms = rp_result.scalars().all() permissions.extend([p.code for p in perms]) unique_perms = list(set(permissions)) if is_root and "*:*" not in unique_perms: unique_perms.insert(0, "*:*") # 将用户上下文存储到 request.state 中 request.state.user = { "id": str(user.id), "username": user.username, "display_name": user.display_name, "department_id": str(user.department_id) if user.department_id else None, "platform_roles": platform_roles, "positions": position_roles, "permissions": unique_perms, "is_root": is_root, "data_scope": "all" if is_root or "all" in data_scopes else ( "subordinate_only" if "subordinate_only" in data_scopes else "self_only" ), } return await call_next(request)