You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

265 lines
9.7 KiB

"""监控模块路由。
提供员工监控功能,包括员工列表查询、个人数据看板、AI 辅助的员工交互分析等。
支持基于数据权限范围(all/subordinate_only/self_only)的访问控制。
"""
import uuid
import json
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, Request
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models import User, ChatSession, ChatMessage
from modules.org.router import _get_subordinate_ids, _user_to_out
from modules.model_provider.router import resolve_model_config
from schemas import EmployeeAnalysis, UserOut
router = APIRouter(prefix="/api/monitor", tags=["monitor"]) # 监控模块路由前缀
@router.get("/employees", response_model=list[UserOut])
async def get_monitor_employees(request: Request, db: AsyncSession = Depends(get_db)):
"""获取可监控的员工列表。
根据当前用户的数据权限范围返回不同的员工列表:
- all:返回所有活跃员工
- subordinate_only:返回当前用户及其所有下级员工
- self_only:仅返回当前用户
Args:
request: HTTP 请求对象,包含当前用户上下文。
db: 异步数据库会话。
Returns:
list[UserOut]: 员工信息列表。
"""
user_ctx = request.state.user
cur_id = uuid.UUID(user_ctx["id"])
if user_ctx["data_scope"] == "all":
# 数据权限为全部:返回所有活跃员工
result = await db.execute(select(User).where(User.status == "active"))
return [await _user_to_out(db, u) for u in result.scalars().all()]
elif user_ctx["data_scope"] == "subordinate_only":
# 数据权限为下级:返回当前用户及其所有下级
sub_ids = await _get_subordinate_ids(db, cur_id)
sub_ids.add(cur_id)
result = await db.execute(select(User).where(User.id.in_(sub_ids)))
return [await _user_to_out(db, u) for u in result.scalars().all()]
else:
# 数据权限为仅自己:仅返回当前用户
result = await db.execute(select(User).where(User.id == cur_id))
return [await _user_to_out(db, u) for u in result.scalars().all()]
@router.get("/employee/{emp_id}/dashboard")
async def get_employee_dashboard(
emp_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)
):
"""获取指定员工的个人数据看板。
包括员工基本信息、消息统计、会话统计、活跃天数、消息分类和最近交互记录。
需要进行数据权限校验。
Args:
emp_id: 员工唯一标识 ID。
request: HTTP 请求对象,包含当前用户上下文。
db: 异步数据库会话。
Returns:
dict: 包含员工信息和统计数据的看板数据。
Raises:
HTTPException: 无权查看或员工不存在时抛出异常。
"""
user_ctx = request.state.user
cur_id = uuid.UUID(user_ctx["id"])
# 数据权限校验
if user_ctx["data_scope"] != "all":
if user_ctx["data_scope"] == "self_only" and str(emp_id) != user_ctx["id"]:
raise HTTPException(403, "无权查看此员工数据")
elif user_ctx["data_scope"] == "subordinate_only":
sub_ids = await _get_subordinate_ids(db, cur_id)
sub_ids.add(cur_id)
if emp_id not in sub_ids:
raise HTTPException(403, "无权查看此员工数据")
# 查询员工信息
emp_result = await db.execute(select(User).where(User.id == emp_id))
emp = emp_result.scalar_one_or_none()
if not emp:
raise HTTPException(404, "员工不存在")
# 统计总消息数
total_msgs_result = await db.execute(
select(func.count(ChatMessage.id)).where(ChatMessage.user_id == emp_id)
)
total_messages = total_msgs_result.scalar() or 0
# 统计总会话数
session_result = await db.execute(
select(func.count(ChatSession.id)).where(ChatSession.user_id == emp_id)
)
total_sessions = session_result.scalar() or 0
# 查询最近 50 条消息
recent_msgs_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.user_id == emp_id)
.order_by(ChatMessage.created_at.desc())
.limit(50)
)
recent = recent_msgs_result.scalars().all()
# 统计话题分布和活跃天数
topics = {}
active_days = set()
for msg in recent:
if msg.created_at:
active_days.add(msg.created_at.strftime("%Y-%m-%d")) # 记录活跃日期
role = msg.role
topics[role] = topics.get(role, 0) + 1 # 按角色统计消息数
return {
"code": 200,
"data": {
"employee": {
"id": str(emp.id),
"name": emp.display_name,
"department": str(emp.department_id) if emp.department_id else "",
"position": emp.position or "",
},
"stats": {
"total_messages": total_messages, # 总消息数
"total_sessions": total_sessions, # 总会话数
"active_days": len(active_days), # 活跃天数
"message_breakdown": topics, # 消息角色分布
"recent_interactions": [
{"role": m.role, "content": m.content[:200], "created_at": str(m.created_at)}
for m in recent[:10] # 最近 10 条交互记录
],
},
},
}
@router.get("/employee/{emp_id}/analysis", response_model=EmployeeAnalysis)
async def get_employee_analysis(
emp_id: uuid.UUID, request: Request, db: AsyncSession = Depends(get_db)
):
"""获取指定员工的 AI 分析报告。
基于员工与 AI 的交互记录,使用大语言模型生成员工分析报告,
包括任务完成率、活跃度、主要话题、效率趋势、优势和改进建议等。
Args:
emp_id: 员工唯一标识 ID。
request: HTTP 请求对象,包含当前用户上下文。
db: 异步数据库会话。
Returns:
EmployeeAnalysis: 员工分析报告。
Raises:
HTTPException: 无权查看或员工不存在时抛出异常。
"""
user_ctx = request.state.user
cur_id = uuid.UUID(user_ctx["id"])
# 数据权限校验
if user_ctx["data_scope"] != "all":
if user_ctx["data_scope"] == "self_only" and str(emp_id) != user_ctx["id"]:
raise HTTPException(403, "无权查看此员工数据")
elif user_ctx["data_scope"] == "subordinate_only":
sub_ids = await _get_subordinate_ids(db, cur_id)
sub_ids.add(cur_id)
if emp_id not in sub_ids:
raise HTTPException(403, "无权查看此员工数据")
# 查询员工信息
emp_result = await db.execute(select(User).where(User.id == emp_id))
emp = emp_result.scalar_one_or_none()
if not emp:
raise HTTPException(404, "员工不存在")
from config import settings
from agentscope.model import OpenAIChatModel
from agentscope.formatter import OpenAIChatFormatter
from agentscope.message import Msg
# 查询最近 100 条消息作为分析数据
msgs_result = await db.execute(
select(ChatMessage)
.where(ChatMessage.user_id == emp_id)
.order_by(ChatMessage.created_at.desc())
.limit(100)
)
messages = msgs_result.scalars().all()
# 构建交互记录文本
interaction_log = "\n".join([
f"[{m.role}] {m.content[:300]}" for m in messages
])
# 初始化 LLM 模型(从模型管理表读取配置)
llm_cfg = await resolve_model_config(db)
model = OpenAIChatModel(
config_name="analysis_model",
model_name=llm_cfg["model_name"],
api_key=llm_cfg["api_key"],
api_base=llm_cfg["base_url"],
)
formatter = OpenAIChatFormatter()
# 构建分析提示词
prompt = formatter.format([
Msg("system", f"""你是一个企业管理者分析助手。请根据员工与AI的交互记录,生成一个JSON格式的分析报告。
要求:
1. 分析员工的task_completion_rate (0-1的浮点数)
2. 统计active_days和total_interactions
3. 提取main_topics (最多5个关键词)
4. 评估efficiency_trend ("提升" / "稳定" / "下降")
5. 给出efficiency_detail (一句话说明)
6. 列出strengths (2-3个优点)
7. 给出growth_suggestions (2-3条建议)
8. 总结personality_traits (一句话)
输出严格JSON格式,不要包含markdown代码块标记。""", "system"),
Msg("user", f"员工姓名: {emp.display_name}\n交互记录:\n{interaction_log}", "user"),
])
try:
res = await model(prompt) # 调用 LLM 生成分析
res_text = ""
if isinstance(res, list):
res_text = res[0].get_text_content() if hasattr(res[0], 'get_text_content') else str(res[0])
elif hasattr(res, 'get_text_content'):
res_text = res.get_text_content()
else:
res_text = str(res)
analysis_data = json.loads(res_text) # 解析 JSON 响应
except Exception:
# LLM 调用失败时使用默认分析数据
analysis_data = {
"task_completion_rate": 0.7,
"active_days": 0,
"total_interactions": len(messages),
"main_topics": [],
"efficiency_trend": "稳定",
"efficiency_detail": "暂无足够数据",
"strengths": [],
"growth_suggestions": [],
"personality_traits": "暂未收集足够人格特征数据",
}
return EmployeeAnalysis(
employee_name=emp.display_name,
department=str(emp.department_id) if emp.department_id else "",
period=f"最近数据",
**analysis_data
)