You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

223 lines
7.0 KiB

"""系统管理模块路由。
提供系统健康检查、使用统计、指标收集和缓存管理等功能。
用于监控系统运行状态和资源使用情况。
"""
import time
import uuid
import psutil
import os
from datetime import datetime
from fastapi import APIRouter, Depends, Request
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import User, ChatSession, ChatMessage, Task, FlowDefinition, FlowExecution, SystemMetric
from schemas import SystemHealthOut, UsageStatsOut
from dependencies import get_current_user
from middleware.cache_manager import cache_manager
from middleware.rate_limiter import rate_limiter
router = APIRouter(prefix="/api/system", tags=["system"])
_start_time = time.time() # 服务启动时间戳
@router.get("/health", response_model=SystemHealthOut)
async def health_check(request: Request, db: AsyncSession = Depends(get_db)):
"""系统健康检查接口。
检查数据库连接、Redis 连接、内存使用、CPU 使用率等系统健康指标。
Args:
request: HTTP 请求对象。
db: 异步数据库会话。
Returns:
SystemHealthOut: 包含系统健康状态的响应数据。
"""
# 检查数据库连接
db_ok = False
try:
await db.execute(select(func.count()).select_from(User))
db_ok = True
except Exception:
pass
mem = psutil.Process(os.getpid()).memory_info() # 获取当前进程内存信息
cpu = psutil.cpu_percent(interval=0.1) # 获取 CPU 使用率
uptime = time.time() - _start_time # 计算服务运行时长
try:
user_count = await db.execute(select(func.count(User.id)))
active_users = user_count.scalar() or 0
except Exception:
active_users = 0
return SystemHealthOut(
status="healthy" if db_ok and cache_manager.available else "degraded", # 数据库和 Redis 都正常则为 healthy
service="enterprise-ai-platform",
uptime_seconds=round(uptime, 1),
db_connected=db_ok,
redis_connected=cache_manager.available,
active_users=active_users,
memory_mb=round(mem.rss / 1024 / 1024, 1), # 转换为 MB
cpu_percent=round(cpu, 1),
)
@router.get("/stats", response_model=UsageStatsOut)
async def usage_stats(request: Request, db: AsyncSession = Depends(get_db)):
"""获取系统使用统计信息。
统计用户数、会话数、消息数、任务数、流程数、API 调用数等关键指标。
Args:
request: HTTP 请求对象。
db: 异步数据库会话。
Returns:
UsageStatsOut: 包含系统使用统计的响应数据。
"""
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) # 今日零点
total_users = (await db.execute(select(func.count(User.id)))).scalar() or 0
# 今日活跃用户数(有新建会话的用户)
active_today = (await db.execute(
select(func.count(func.distinct(User.id)))
.join(ChatSession, ChatSession.user_id == User.id)
.where(ChatSession.created_at >= today)
)).scalar() or 0
total_sessions = (await db.execute(select(func.count(ChatSession.id)))).scalar() or 0
total_messages = (await db.execute(select(func.count(ChatMessage.id)))).scalar() or 0
total_tasks = (await db.execute(select(func.count(Task.id)))).scalar() or 0
total_flows = (await db.execute(select(func.count(FlowDefinition.id)))).scalar() or 0
published = (await db.execute(
select(func.count(FlowDefinition.id)).where(FlowDefinition.status == "published")
)).scalar() or 0
# 今日 API 调用数(有新建执行记录的流程)
api_calls = (await db.execute(
select(func.count(FlowExecution.id)).where(FlowExecution.started_at >= today)
)).scalar() or 0
return UsageStatsOut(
total_users=total_users,
active_users_today=active_today,
total_sessions=total_sessions,
total_messages=total_messages,
total_tasks=total_tasks,
total_flows=total_flows,
published_flows=published,
api_calls_today=api_calls,
avg_response_time_ms=0.0,
)
@router.post("/metrics")
async def collect_metrics(payload: dict, request: Request, db: AsyncSession = Depends(get_db)):
"""收集并存储系统指标数据。
Args:
payload: 请求体,包含 metric_type、value、source 字段。
request: HTTP 请求对象。
db: 异步数据库会话。
Returns:
dict: 包含指标 ID 的响应数据。
"""
metric = SystemMetric(
metric_type=payload.get("metric_type", "custom"),
value={"data": payload.get("value", {}), "source": payload.get("source", "api")},
)
db.add(metric)
await db.flush()
return {"code": 200, "metric_id": str(metric.id)}
@router.get("/metrics")
async def list_metrics(
request: Request,
metric_type: str | None = None,
limit: int = 50,
db: AsyncSession = Depends(get_db),
):
"""查询系统指标历史数据。
Args:
request: HTTP 请求对象。
metric_type: 可选的指标类型筛选条件。
limit: 返回结果的最大数量。
db: 异步数据库会话。
Returns:
dict: 包含指标列表的响应数据。
"""
q = select(SystemMetric).order_by(SystemMetric.collected_at.desc())
if metric_type:
q = q.where(SystemMetric.metric_type == metric_type)
q = q.limit(limit)
result = await db.execute(q)
metrics = result.scalars().all()
return {
"code": 200,
"data": [{
"id": str(m.id),
"metric_type": m.metric_type,
"value": m.value,
"collected_at": m.collected_at.isoformat() if m.collected_at else None,
} for m in metrics],
}
@router.get("/cache/stats")
async def cache_stats(request: Request):
"""获取缓存系统状态信息。
Args:
request: HTTP 请求对象。
Returns:
dict: 包含 Redis 可用性状态的响应数据。
"""
return {
"code": 200,
"data": {
"redis_available": cache_manager.available,
},
}
@router.get("/ratelimit/stats")
async def ratelimit_stats(request: Request):
"""获取速率限制状态信息。
Args:
request: HTTP 请求对象。
Returns:
dict: 包含速率限制配置的响应数据。
"""
remaining = await rate_limiter.remaining("global")
return {
"code": 200,
"data": {
"limit_per_minute": 60,
"window_seconds": 60,
"remaining": remaining,
},
}
@router.post("/cache/clear")
async def clear_cache(request: Request, pattern: str = "*"):
"""清除缓存数据。
Args:
request: HTTP 请求对象。
pattern: 缓存键匹配模式,默认清除所有缓存。
Returns:
dict: 操作结果响应。
"""
await cache_manager.delete_pattern(pattern)
return {"code": 200, "message": "缓存已清除"}