You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

98 lines
4.0 KiB

"""RBAC 权限中间件模块。
提供全局 HTTP 请求的 RBAC(基于角色的访问控制)权限校验中间件。
每个请求都会经过此中间件,解析 JWT 令牌并查询用户的角色和权限信息,
将用户上下文存储到 request.state.user 中供后续路由使用。
"""
import jwt
from fastapi import Request, HTTPException
from fastapi.responses import JSONResponse
from config import settings
from database import AsyncSessionLocal
from models import User, UserRole, Role, RolePermission, Permission
from sqlalchemy import select
async def rbac_middleware(request: Request, call_next):
"""RBAC 权限校验中间件。
对每个 HTTP 请求进行权限校验:
1. 跳过公开路径(登录、健康检查等)
2. 解析 JWT 令牌获取用户身份
3. 从数据库查询用户的角色、权限和数据权限范围
4. 将用户上下文存储到 request.state.user 中
Args:
request: 当前 HTTP 请求对象。
call_next: 下一个中间件或路由处理函数。
Returns:
Response: 如果权限校验通过则返回后续处理结果,否则返回 401 错误响应。
"""
# 公开路径列表,无需认证即可访问
public_paths = ["/api/auth/login", "/api/auth/wecom", "/health", "/docs", "/openapi.json", "/wecom/callback"]
if any(request.url.path.startswith(p) for p in public_paths):
return await call_next(request)
# 从 Authorization 头中提取 JWT 令牌
token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token:
return JSONResponse({"code": 401, "message": "未提供认证令牌"}, 401)
# 解析 JWT 令牌获取用户 ID
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
user_id = payload.get("sub")
except jwt.PyJWTError:
return JSONResponse({"code": 401, "message": "令牌无效或已过期"}, 401)
# 从数据库查询用户信息和权限
async with AsyncSessionLocal() as db:
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user or user.status != "active":
return JSONResponse({"code": 401, "message": "用户不存在或已禁用"}, 401)
# 查询用户关联的所有角色
ur_result = await db.execute(
select(Role).join(UserRole).where(UserRole.user_id == user.id)
)
roles = ur_result.scalars().all()
role_codes = [r.code for r in roles] # 角色编码列表
is_root = "root" in role_codes # 是否为超级管理员
# 收集所有权限编码和数据权限范围
permissions = []
data_scopes = []
for role in roles:
data_scopes.append(role.data_scope)
rp_result = await db.execute(
select(Permission).join(RolePermission).where(RolePermission.role_id == role.id)
)
perms = rp_result.scalars().all()
permissions.extend([p.code for p in perms])
unique_perms = list(set(permissions)) # 去重后的权限列表
# 超级管理员自动拥有所有权限
if is_root and "*:*" not in unique_perms:
unique_perms.insert(0, "*:*")
# 将用户上下文存储到 request.state 中
request.state.user = {
"id": str(user.id),
"username": user.username,
"display_name": user.display_name,
"department_id": str(user.department_id) if user.department_id else None,
"roles": [{"code": r.code, "name": r.name, "data_scope": r.data_scope} for r in roles],
"permissions": unique_perms,
"is_root": is_root,
"data_scope": "all" if is_root or "all" in data_scopes else (
"department" if "department" in data_scopes else
"subordinate_only" if "subordinate_only" in data_scopes else
"self_only"
),
}
return await call_next(request)