import httpx import logging import os logger = logging.getLogger(__name__) _INTERNAL_BASE = os.getenv("INTERNAL_API_BASE", "http://127.0.0.1:8000/api") _client: httpx.Client | None = None def _get_client() -> httpx.Client: global _client if _client is None: _client = httpx.Client(timeout=30) return _client def _get_token() -> str | None: from config import settings try: resp = _get_client().post( f"{_INTERNAL_BASE}/auth/login", json={"username": "admin", "password": "admin123"}, ) data = resp.json() return data.get("access_token") except Exception: return None def _headers(token: str | None = None) -> dict: t = token or _get_token() return {"Authorization": f"Bearer {t}"} if t else {} def list_tasks(status: str | None = None) -> str: try: resp = _get_client().get(f"{_INTERNAL_BASE}/tasks", headers=_headers()) tasks = resp.json() if isinstance(resp.json(), list) else resp.json().get("data", []) if status: tasks = [t for t in tasks if t.get("status") == status] if not tasks: return "当前没有任务。" lines = [] for t in tasks: lines.append( f"- [{t.get('status', '?')}] {t.get('id', '')[:8]} | {t.get('title', '无标题')} " f"| 负责人: {t.get('assignee_name', t.get('assignee_id', '无人'))} " f"| 截止: {t.get('deadline', '无')} " f"| 优先级: {t.get('priority', '?')}" ) return "\n".join(lines) except Exception as e: return f"查询任务列表失败: {e}" def create_task(title: str, description: str = "", assignee_id: str = "", priority: str = "medium", deadline: str | None = None) -> str: try: body = { "title": title, "description": description, "assignee_id": assignee_id, "priority": priority, "deadline": deadline, } resp = _get_client().post(f"{_INTERNAL_BASE}/tasks", json=body, headers=_headers()) task = resp.json() return f"任务创建成功: {task.get('title', title)} (ID: {task.get('id', '?')[:8]})" except Exception as e: return f"创建任务失败: {e}" def get_task(task_id: str) -> str: try: resp = _get_client().get(f"{_INTERNAL_BASE}/tasks/{task_id}", headers=_headers()) t = resp.json() return ( f"任务: {t.get('title', '?')}\n" f"描述: {t.get('description', '无')}\n" f"负责人: {t.get('assignee_name', t.get('assignee_id', '无人'))}\n" f"状态: {t.get('status', '?')} | 优先级: {t.get('priority', '?')} | 截止: {t.get('deadline', '无')}" ) except Exception as e: return f"查询任务失败: {e}" def update_task(task_id: str, status: str | None = None, description: str | None = None) -> str: try: body = {} if status: body["status"] = status if description: body["description"] = description resp = _get_client().put(f"{_INTERNAL_BASE}/tasks/{task_id}", json=body, headers=_headers()) return f"任务 {task_id[:8]} 已更新" except Exception as e: return f"更新任务失败: {e}" def push_task_to_wecom(task_id: str) -> str: try: resp = _get_client().post(f"{_INTERNAL_BASE}/tasks/{task_id}/push", headers=_headers()) data = resp.json() if hasattr(resp, 'json') else resp return f"任务 {task_id[:8]} 已推送至企业微信" except Exception as e: return f"推送任务失败: {e}" __all__ = ["list_tasks", "create_task", "get_task", "update_task", "push_task_to_wecom"]