import httpx import logging import os logger = logging.getLogger(__name__) _INTERNAL_BASE = os.getenv("INTERNAL_API_BASE", "http://127.0.0.1:8000/api") _client: httpx.Client | None = None def _get_client() -> httpx.Client: global _client if _client is None: _client = httpx.Client(timeout=30) return _client def _get_token() -> str | None: try: resp = _get_client().post( f"{_INTERNAL_BASE}/auth/login", json={"username": "admin", "password": "admin123"}, ) data = resp.json() return data.get("access_token") except Exception: return None def _headers(token: str | None = None) -> dict: t = token or _get_token() return {"Authorization": f"Bearer {t}"} if t else {} def list_subordinates() -> str: try: resp = _get_client().get(f"{_INTERNAL_BASE}/org/subordinates", headers=_headers()) users = resp.json() if isinstance(resp.json(), list) else resp.json().get("data", []) if not users: return "当前没有下属员工数据。" lines = ["下属员工列表:"] for u in users: lines.append( f"- {u.get('display_name', u.get('username', '?'))} " f"| 岗位: {u.get('position', '?')} " f"| 部门: {u.get('department_name', '?')}" ) return "\n".join(lines) except Exception as e: return f"查询下属列表失败: {e}" def get_employee_dashboard(employee_id: str) -> str: try: resp = _get_client().get( f"{_INTERNAL_BASE}/monitor/employee/{employee_id}/dashboard", headers=_headers(), ) data = resp.json() return ( f"员工 {employee_id[:8]} 工作看板:\n" f"- 任务完成率: {data.get('completion_rate', '?')}%\n" f"- 平均响应时间: {data.get('avg_response_time', '?')} 分钟\n" f"- 今日任务数: {data.get('today_tasks', 0)}\n" f"- 本周完成: {data.get('weekly_completed', 0)}" ) except Exception as e: return f"查询员工看板失败: {e}" def generate_efficiency_report(department_id: str | None = None) -> str: try: resp = _get_client().get(f"{_INTERNAL_BASE}/monitor/employees", headers=_headers()) employees = resp.json() if isinstance(resp.json(), list) else resp.json().get("data", []) report = ["=== 团队效率报告 ===\n"] total_tasks = 0 active_employees = 0 for emp in employees: task_count = emp.get("task_count", 0) total_tasks += task_count if emp.get("status") == "active": active_employees += 1 report.append( f"- {emp.get('display_name', emp.get('username', '?'))}: " f"任务数={task_count}, 完成率={emp.get('completion_rate', 0)}%" ) report.append( f"\n总结: 活跃员工 {active_employees}/{len(employees)} 人, 总任务 {total_tasks} 个" ) return "\n".join(report) except Exception as e: return f"生成报告失败: {e}" def get_task_statistics(employee_id: str | None = None) -> str: try: resp = _get_client().get(f"{_INTERNAL_BASE}/tasks", headers=_headers()) tasks = resp.json() if isinstance(resp.json(), list) else resp.json().get("data", []) if employee_id: tasks = [t for t in tasks if t.get("assignee_id") == employee_id] todo = sum(1 for t in tasks if t.get("status") == "todo") in_progress = sum(1 for t in tasks if t.get("status") == "in_progress") done = sum(1 for t in tasks if t.get("status") == "done") return ( f"任务统计:\n" f"- 待办: {todo}\n" f"- 进行中: {in_progress}\n" f"- 已完成: {done}\n" f"- 总计: {len(tasks)}" ) except Exception as e: return f"查询任务统计失败: {e}" __all__ = ["list_subordinates", "get_employee_dashboard", "generate_efficiency_report", "get_task_statistics"]