You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

207 lines
6.6 KiB

"""审计日志模块路由。
提供审计日志的查询、统计和导出功能。
记录系统中所有重要操作的详细信息,支持按操作类型、资源、操作人和时间范围筛选。
"""
import uuid
import csv
import io
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Request, Query
from fastapi.responses import StreamingResponse
from sqlalchemy import select, func, and_
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import AuditLog
from schemas import AuditLogOut, AuditLogPage
from dependencies import get_current_user
router = APIRouter(prefix="/api/audit", tags=["audit"])
@router.get("/logs", response_model=AuditLogPage)
async def list_logs(
request: Request,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
action: str | None = Query(None),
resource: str | None = Query(None),
user_id: uuid.UUID | None = Query(None),
date_from: datetime | None = Query(None),
date_to: datetime | None = Query(None),
db: AsyncSession = Depends(get_db),
):
"""分页查询审计日志列表,支持多条件筛选。
Args:
request: HTTP 请求对象。
page: 页码,从 1 开始。
page_size: 每页数量,最大 100。
action: 可选的操作类型筛选条件。
resource: 可选的资源类型筛选条件。
user_id: 可选的操作人 ID 筛选条件。
date_from: 可选的起始时间筛选条件。
date_to: 可选的结束时间筛选条件。
db: 异步数据库会话。
Returns:
AuditLogPage: 分页的审计日志响应数据。
"""
conditions = []
if action:
conditions.append(AuditLog.action == action)
if resource:
conditions.append(AuditLog.resource_type == resource)
if user_id:
conditions.append(AuditLog.user_id == user_id)
if date_from:
conditions.append(AuditLog.created_at >= date_from)
if date_to:
conditions.append(AuditLog.created_at <= date_to)
where = and_(*conditions) if conditions else None # 组合所有筛选条件
# 查询总数
count_q = select(func.count(AuditLog.id))
if where is not None:
count_q = count_q.where(where)
total_result = await db.execute(count_q)
total = total_result.scalar() or 0
# 分页查询
q = select(AuditLog).order_by(AuditLog.created_at.desc())
if where is not None:
q = q.where(where)
q = q.offset((page - 1) * page_size).limit(page_size)
result = await db.execute(q)
logs = result.scalars().all()
return AuditLogPage(
items=[AuditLogOut.model_validate(log, from_attributes=True) for log in logs],
total=total,
page=page,
page_size=page_size,
)
@router.get("/actions")
async def list_action_types(request: Request, db: AsyncSession = Depends(get_db)):
"""获取所有操作类型及其出现次数统计。
Args:
request: HTTP 请求对象。
db: 异步数据库会话。
Returns:
dict: 包含操作类型统计列表的响应数据。
"""
result = await db.execute(
select(AuditLog.action, func.count(AuditLog.id)).group_by(AuditLog.action)
)
return {
"code": 200,
"data": [{"action": r[0], "count": r[1]} for r in result.all()],
}
@router.get("/stats")
async def audit_stats(request: Request, db: AsyncSession = Depends(get_db)):
"""获取审计日志的统计摘要,包括总数、今日数量和 TOP 排行。
Args:
request: HTTP 请求对象。
db: 异步数据库会话。
Returns:
dict: 包含审计统计摘要的响应数据。
"""
total_result = await db.execute(select(func.count(AuditLog.id)))
total = total_result.scalar() or 0
today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) # 今日零点
today_result = await db.execute(
select(func.count(AuditLog.id)).where(AuditLog.created_at >= today_start)
)
today = today_result.scalar() or 0
# 最常见的操作类型 TOP 10
top_result = await db.execute(
select(AuditLog.action, func.count(AuditLog.id))
.group_by(AuditLog.action)
.order_by(func.count(AuditLog.id).desc())
.limit(10)
)
top_actions = [{"action": r[0], "count": r[1]} for r in top_result.all()]
# 最常见的资源类型 TOP 10
top_resources = await db.execute(
select(AuditLog.resource_type, func.count(AuditLog.id))
.group_by(AuditLog.resource_type)
.order_by(func.count(AuditLog.id).desc())
.limit(10)
)
top_resources_list = [{"resource": r[0], "count": r[1]} for r in top_resources.all()]
return {
"code": 200,
"data": {
"total": total,
"today": today,
"top_actions": top_actions,
"top_resources": top_resources_list,
},
}
@router.get("/export")
async def export_logs(
request: Request,
date_from: datetime | None = Query(None),
date_to: datetime | None = Query(None),
db: AsyncSession = Depends(get_db),
):
"""导出审计日志为 CSV 文件。
Args:
request: HTTP 请求对象。
date_from: 可选的起始时间筛选条件。
date_to: 可选的结束时间筛选条件。
db: 异步数据库会话。
Returns:
StreamingResponse: CSV 格式的文件流响应。
"""
conditions = []
if date_from:
conditions.append(AuditLog.created_at >= date_from)
if date_to:
conditions.append(AuditLog.created_at <= date_to)
q = select(AuditLog).order_by(AuditLog.created_at.desc())
if conditions:
q = q.where(and_(*conditions))
q = q.limit(10000) # 最多导出 10000 条
result = await db.execute(q)
logs = result.scalars().all()
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["ID", "操作时间", "操作人ID", "操作", "资源", "资源ID", "详情", "IP地址"]) # CSV 表头
for log in logs:
writer.writerow([
str(log.id),
log.created_at.isoformat() if log.created_at else "",
str(log.user_id) if log.user_id else "",
log.action,
log.resource_type or "",
log.resource_id or "",
str(log.detail)[:500] if log.detail else "",
log.ip_address or "",
])
output.seek(0)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename=audit_logs_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.csv"},
)