You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

178 lines
6.7 KiB

"""管理者工具模块。
提供管理者专属的工具函数,包括下属员工查询、员工看板数据获取、团队效率报告生成和任务统计等功能。
通过内部 HTTP API 与后端组织监控服务通信,使用 JWT 服务令牌进行认证。
"""
import httpx
import logging
import os
import jwt
import time
from config import settings
logger = logging.getLogger(__name__) # 当前模块的日志记录器
_INTERNAL_BASE = os.getenv("INTERNAL_API_BASE", "http://127.0.0.1:8000/api") # 内部 API 基础地址
_client: httpx.Client | None = None # 全局复用的 HTTP 客户端实例
def _get_client() -> httpx.Client:
"""获取或创建全局复用的 HTTP 客户端实例。
Returns:
httpx.Client: 配置了超时时间的 HTTP 客户端。
"""
global _client
if _client is None:
_client = httpx.Client(timeout=30)
return _client
def _get_service_token() -> str | None:
"""生成用于内部服务间调用的 JWT 令牌。
Returns:
str | None: 编码后的 JWT 令牌,生成失败返回 None。
"""
try:
payload = {"sub": "system_tool", "exp": int(time.time()) + 3600, "type": "service"}
token = jwt.encode(payload, settings.JWT_SECRET, algorithm="HS256")
return token
except Exception:
return None
def _headers(token: str | None = None) -> dict:
"""构建 HTTP 请求头,包含认证令牌。
Args:
token: 可选的自定义令牌,不提供则自动生成服务令牌。
Returns:
dict: 包含 Authorization 头的字典。
"""
t = token or _get_service_token()
return {"Authorization": f"Bearer {t}"} if t else {}
# 工具函数描述 Schema,用于 AgentScope 工具注册
SCHEMAS = {
"list_subordinates": {
"name": "list_subordinates",
"description": "查询当前用户的下属员工列表",
"parameters": {"type": "object", "properties": {}}
},
"get_employee_dashboard": {
"name": "get_employee_dashboard",
"description": "查询指定员工的工作看板数据",
"parameters": {
"type": "object",
"properties": {"employee_id": {"type": "string", "description": "员工ID"}},
"required": ["employee_id"]
}
},
"generate_efficiency_report": {
"name": "generate_efficiency_report",
"description": "生成团队效率分析报告",
"parameters": {
"type": "object",
"properties": {"department_id": {"type": "string", "description": "部门ID(可选)"}}
}
},
"get_task_statistics": {
"name": "get_task_statistics",
"description": "查询任务统计数据",
"parameters": {
"type": "object",
"properties": {"employee_id": {"type": "string", "description": "员工ID(可选)"}}
}
},
}
def list_subordinates() -> str:
"""查询当前管理者名下的下属员工列表。
Returns:
str: 格式化的下属员工列表文本或错误信息。
"""
try:
resp = _get_client().get(f"{_INTERNAL_BASE}/org/subordinates", headers=_headers())
users = resp.json() if isinstance(resp.json(), list) else resp.json().get("data", [])
if not users:
return "当前没有下属员工数据。"
lines = ["下属员工列表:"]
for u in users:
lines.append(f"- {u.get('display_name', u.get('username', '?'))} | 岗位: {u.get('position', '?')} | 部门: {u.get('department_name', '?')}")
return "\n".join(lines)
except Exception as e:
return f"查询下属列表失败: {e}"
def get_employee_dashboard(employee_id: str) -> str:
"""查询指定员工的工作看板数据,包括任务完成率、响应时间等指标。
Args:
employee_id: 员工唯一标识 ID。
Returns:
str: 格式化的员工看板数据或错误信息。
"""
try:
resp = _get_client().get(f"{_INTERNAL_BASE}/monitor/employee/{employee_id}/dashboard", headers=_headers())
data = resp.json()
return f"员工 {employee_id[:8]} 工作看板:\n- 任务完成率: {data.get('completion_rate', '?')}%\n- 平均响应时间: {data.get('avg_response_time', '?')} 分钟\n- 今日任务数: {data.get('today_tasks', 0)}\n- 本周完成: {data.get('weekly_completed', 0)}"
except Exception as e:
return f"查询员工看板失败: {e}"
def generate_efficiency_report(department_id: str | None = None) -> str:
"""生成团队效率分析报告,包含各员工的任务数和完成率统计。
Args:
department_id: 可选的部门 ID,用于限定报告范围。
Returns:
str: 格式化的团队效率报告或错误信息。
"""
try:
resp = _get_client().get(f"{_INTERNAL_BASE}/monitor/employees", headers=_headers())
employees = resp.json() if isinstance(resp.json(), list) else resp.json().get("data", [])
report = ["=== 团队效率报告 ===\n"]
total_tasks = 0
active_employees = 0
for emp in employees:
task_count = emp.get("task_count", 0)
total_tasks += task_count
if emp.get("status") == "active":
active_employees += 1
report.append(f"- {emp.get('display_name', emp.get('username', '?'))}: 任务数={task_count}, 完成率={emp.get('completion_rate', 0)}%")
report.append(f"\n总结: 活跃员工 {active_employees}/{len(employees)} 人, 总任务 {total_tasks}")
return "\n".join(report)
except Exception as e:
return f"生成报告失败: {e}"
def get_task_statistics(employee_id: str | None = None) -> str:
"""查询任务统计数据,支持按员工筛选。
Args:
employee_id: 可选的员工 ID,用于筛选特定员工的任务。
Returns:
str: 格式化的任务统计信息或错误信息。
"""
try:
resp = _get_client().get(f"{_INTERNAL_BASE}/tasks", headers=_headers())
tasks = resp.json() if isinstance(resp.json(), list) else resp.json().get("data", [])
if employee_id:
tasks = [t for t in tasks if t.get("assignee_id") == employee_id]
todo = sum(1 for t in tasks if t.get("status") == "todo")
in_progress = sum(1 for t in tasks if t.get("status") == "in_progress")
done = sum(1 for t in tasks if t.get("status") == "done")
return f"任务统计:\n- 待办: {todo}\n- 进行中: {in_progress}\n- 已完成: {done}\n- 总计: {len(tasks)}"
except Exception as e:
return f"查询任务统计失败: {e}"
__all__ = ["list_subordinates", "get_employee_dashboard", "generate_efficiency_report", "get_task_statistics", "SCHEMAS"]