You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
98 lines
4.0 KiB
98 lines
4.0 KiB
"""RBAC 权限中间件模块。
|
|
|
|
提供全局 HTTP 请求的 RBAC(基于角色的访问控制)权限校验中间件。
|
|
每个请求都会经过此中间件,解析 JWT 令牌并查询用户的角色和权限信息,
|
|
将用户上下文存储到 request.state.user 中供后续路由使用。
|
|
"""
|
|
import jwt
|
|
from fastapi import Request, HTTPException
|
|
from fastapi.responses import JSONResponse
|
|
from config import settings
|
|
from database import AsyncSessionLocal
|
|
from models import User, UserRole, Role, RolePermission, Permission
|
|
from sqlalchemy import select
|
|
|
|
|
|
async def rbac_middleware(request: Request, call_next):
|
|
"""RBAC 权限校验中间件。
|
|
|
|
对每个 HTTP 请求进行权限校验:
|
|
1. 跳过公开路径(登录、健康检查等)
|
|
2. 解析 JWT 令牌获取用户身份
|
|
3. 从数据库查询用户的角色、权限和数据权限范围
|
|
4. 将用户上下文存储到 request.state.user 中
|
|
|
|
Args:
|
|
request: 当前 HTTP 请求对象。
|
|
call_next: 下一个中间件或路由处理函数。
|
|
|
|
Returns:
|
|
Response: 如果权限校验通过则返回后续处理结果,否则返回 401 错误响应。
|
|
"""
|
|
# 公开路径列表,无需认证即可访问
|
|
public_paths = ["/api/auth/login", "/api/auth/wecom", "/health", "/docs", "/openapi.json", "/wecom/callback"]
|
|
if any(request.url.path.startswith(p) for p in public_paths):
|
|
return await call_next(request)
|
|
|
|
# 从 Authorization 头中提取 JWT 令牌
|
|
token = request.headers.get("Authorization", "").replace("Bearer ", "")
|
|
if not token:
|
|
return JSONResponse({"code": 401, "message": "未提供认证令牌"}, 401)
|
|
|
|
# 解析 JWT 令牌获取用户 ID
|
|
try:
|
|
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
|
|
user_id = payload.get("sub")
|
|
except jwt.PyJWTError:
|
|
return JSONResponse({"code": 401, "message": "令牌无效或已过期"}, 401)
|
|
|
|
# 从数据库查询用户信息和权限
|
|
async with AsyncSessionLocal() as db:
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user or user.status != "active":
|
|
return JSONResponse({"code": 401, "message": "用户不存在或已禁用"}, 401)
|
|
|
|
# 查询用户关联的所有角色
|
|
ur_result = await db.execute(
|
|
select(Role).join(UserRole).where(UserRole.user_id == user.id)
|
|
)
|
|
roles = ur_result.scalars().all()
|
|
|
|
role_codes = [r.code for r in roles] # 角色编码列表
|
|
is_root = "root" in role_codes # 是否为超级管理员
|
|
|
|
# 收集所有权限编码和数据权限范围
|
|
permissions = []
|
|
data_scopes = []
|
|
for role in roles:
|
|
data_scopes.append(role.data_scope)
|
|
rp_result = await db.execute(
|
|
select(Permission).join(RolePermission).where(RolePermission.role_id == role.id)
|
|
)
|
|
perms = rp_result.scalars().all()
|
|
permissions.extend([p.code for p in perms])
|
|
|
|
unique_perms = list(set(permissions)) # 去重后的权限列表
|
|
|
|
# 超级管理员自动拥有所有权限
|
|
if is_root and "*:*" not in unique_perms:
|
|
unique_perms.insert(0, "*:*")
|
|
|
|
# 将用户上下文存储到 request.state 中
|
|
request.state.user = {
|
|
"id": str(user.id),
|
|
"username": user.username,
|
|
"display_name": user.display_name,
|
|
"department_id": str(user.department_id) if user.department_id else None,
|
|
"roles": [{"code": r.code, "name": r.name, "data_scope": r.data_scope} for r in roles],
|
|
"permissions": unique_perms,
|
|
"is_root": is_root,
|
|
"data_scope": "all" if is_root or "all" in data_scopes else (
|
|
"department" if "department" in data_scopes else
|
|
"subordinate_only" if "subordinate_only" in data_scopes else
|
|
"self_only"
|
|
),
|
|
}
|
|
|
|
return await call_next(request)
|
|
|