You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

121 lines
4.2 KiB

import uuid
from fastapi import APIRouter, Depends, Query, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import Role, Permission, RolePermission
from schemas import RoleCreate, RoleUpdate, RoleOut, PermissionOut
router = APIRouter(prefix="/api/rbac", tags=["rbac"])
@router.get("/roles", response_model=list[RoleOut])
async def get_roles(
request: Request,
role_type: str | None = Query(None, description="按类型过滤: platform(平台角色) / position(岗位)"),
db: AsyncSession = Depends(get_db),
):
"""获取角色列表,支持按 role_type 过滤。"""
stmt = select(Role)
if role_type:
stmt = stmt.where(Role.role_type == role_type)
result = await db.execute(stmt.order_by(Role.created_at))
roles = result.scalars().all()
return [await _role_to_out(db, r) for r in roles]
@router.get("/roles/{role_id}", response_model=RoleOut)
async def get_role(role_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Role).where(Role.id == role_id))
role = result.scalar_one_or_none()
if not role:
from fastapi import HTTPException
raise HTTPException(404, "角色不存在")
return await _role_to_out(db, role)
@router.post("/roles", response_model=RoleOut)
async def create_role(req: RoleCreate, request: Request, db: AsyncSession = Depends(get_db)):
role = Role(
name=req.name, code=req.code or f"custom_{req.name}",
description=req.description, role_type=req.role_type,
data_scope=req.data_scope,
)
db.add(role)
await db.flush()
for perm_id in req.permission_ids:
db.add(RolePermission(role_id=role.id, permission_id=perm_id))
await db.flush()
return await _role_to_out(db, role)
@router.put("/roles/{role_id}", response_model=RoleOut)
async def update_role(
role_id: uuid.UUID, req: RoleUpdate,
request: Request, db: AsyncSession = Depends(get_db),
):
result = await db.execute(select(Role).where(Role.id == role_id))
role = result.scalar_one_or_none()
if not role:
from fastapi import HTTPException
raise HTTPException(404, "角色不存在")
if req.name is not None:
role.name = req.name
if req.description is not None:
role.description = req.description
if req.role_type is not None:
role.role_type = req.role_type
if req.data_scope is not None:
role.data_scope = req.data_scope
if req.permission_ids is not None:
existing = (await db.execute(
select(RolePermission).where(RolePermission.role_id == role.id)
)).scalars().all()
for rp in existing:
await db.delete(rp)
for perm_id in req.permission_ids:
db.add(RolePermission(role_id=role.id, permission_id=perm_id))
return await _role_to_out(db, role)
@router.delete("/roles/{role_id}")
async def delete_role(role_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)):
from fastapi import HTTPException
result = await db.execute(select(Role).where(Role.id == role_id))
role = result.scalar_one_or_none()
if not role:
raise HTTPException(404, "角色不存在")
if role.is_system:
raise HTTPException(400, "系统预置角色不可删除")
await db.delete(role)
return {"code": 200, "message": "删除成功"}
@router.get("/permissions", response_model=list[PermissionOut])
async def get_permissions(request: Request, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(Permission))
perms = result.scalars().all()
return [PermissionOut(
id=p.id, code=p.code, name=p.name,
resource=p.resource, action=p.action,
) for p in perms]
async def _role_to_out(db: AsyncSession, role: Role) -> RoleOut:
rp_result = await db.execute(
select(Permission.code)
.join(RolePermission)
.where(RolePermission.role_id == role.id)
)
perms = list(rp_result.scalars().all())
return RoleOut(
id=role.id, name=role.name, code=role.code,
description=role.description, role_type=role.role_type or "position",
is_system=role.is_system, data_scope=role.data_scope,
permissions=perms,
)