from agentscope.agent import AgentBase from agentscope.agent._react_agent import ReActAgent from agentscope.model import OpenAIChatModel from agentscope.formatter import OpenAIChatFormatter from agentscope.tool import Toolkit from agentscope.message import Msg from config import settings from .memory.user_memory import UserIsolatedMemory from .hooks.rbac_hook import register_rbac_hooks_for_user class AgentFactory: _model: OpenAIChatModel | None = None _formatter: OpenAIChatFormatter | None = None _agent_cache: dict[str, AgentBase] = {} @classmethod def _get_model(cls) -> OpenAIChatModel: if cls._model is None: cls._model = OpenAIChatModel( config_name="enterprise_model", model_name=settings.LLM_MODEL, api_key=settings.LLM_API_KEY, api_base=settings.LLM_API_BASE, ) return cls._model @classmethod def _get_formatter(cls) -> OpenAIChatFormatter: if cls._formatter is None: cls._formatter = OpenAIChatFormatter() return cls._formatter @classmethod async def create_agent( cls, agent_type: str, user_id: str, user_name: str, department_id: str | None = None, ) -> AgentBase: cache_key = f"{agent_type}_{user_id}" if cache_key in cls._agent_cache: return cls._agent_cache[cache_key] model = cls._get_model() formatter = cls._get_formatter() if agent_type == "employee": agent = await cls._create_employee_agent(user_id, user_name, department_id, model, formatter) elif agent_type == "manager": agent = await cls._create_manager_agent(user_id, user_name, model, formatter) elif agent_type == "task": agent = await cls._create_task_agent(user_id, user_name, model, formatter) elif agent_type == "document": agent = await cls._create_document_agent(user_id, user_name, model, formatter) else: agent = await cls._create_employee_agent(user_id, user_name, department_id, model, formatter) cls._agent_cache[cache_key] = agent return agent @classmethod async def _create_employee_agent(cls, user_id, user_name, department_id, model, formatter): from .tools.wecom_tools import send_notification from .tools.document_tools import parse_document, format_correction toolkit = Toolkit() toolkit.register_tool_function(send_notification) toolkit.register_tool_function(parse_document) toolkit.register_tool_function(format_correction) agent = ReActAgent( name=f"EmployeeAI_{user_name}", sys_prompt=f"""你是 {user_name} 的专属AI工作助手。 你可以: 1. 回答工作中的问题,提供专业建议 2. 帮助处理文档,修正格式 3. 查询知识库获取信息 4. 发送通知给相关人员 重要约束: - 只能访问该员工权限范围内的数据和工具 - 涉及敏感操作需要二次确认 - 始终保持专业和友好的态度""", model=model, formatter=formatter, toolkit=toolkit, memory=UserIsolatedMemory(user_id=user_id), max_iters=8, ) register_rbac_hooks_for_user(agent, { "user_id": user_id, "user_name": user_name, "role": "employee", "department_id": department_id or "", "data_scope": "self_only", }) return agent @classmethod async def _create_manager_agent(cls, user_id, user_name, model, formatter): toolkit = Toolkit() agent = ReActAgent( name=f"ManagerAI_{user_name}", sys_prompt=f"""你是 {user_name} 的管理分析助手。 你可以: 1. 分析下属员工的工作数据 2. 生成工作效率报告 3. 提供管理决策建议 重要约束: - 只能查看你的直接和间接下属的数据 - 不能查看非下属或跨部门员工的数据""", model=model, formatter=formatter, toolkit=toolkit, memory=UserIsolatedMemory(user_id=user_id), max_iters=8, ) register_rbac_hooks_for_user(agent, { "user_id": user_id, "user_name": user_name, "role": "dept_manager", "data_scope": "subordinate_only", }) return agent @classmethod async def _create_task_agent(cls, user_id, user_name, model, formatter): toolkit = Toolkit() agent = ReActAgent( name=f"TaskAI_{user_name}", sys_prompt=f"""你是任务管理助手。帮助用户创建、跟踪和管理工作任务。 你可以: 1. 创建新任务并分配给指定人员 2. 查询任务状态和进度 3. 更新任务信息 4. 推送任务通知到企业微信""", model=model, formatter=formatter, toolkit=toolkit, memory=UserIsolatedMemory(user_id=user_id), max_iters=8, ) return agent @classmethod async def _create_document_agent(cls, user_id, user_name, model, formatter): from .tools.document_tools import parse_document, format_correction toolkit = Toolkit() toolkit.register_tool_function(parse_document) toolkit.register_tool_function(format_correction) agent = ReActAgent( name=f"DocAI_{user_name}", sys_prompt=f"""你是文档处理专家。帮助用户处理各类文档。 你可以: 1. 解析PDF/Word/Excel/PPT等格式 2. 修正文档格式 3. 提取文档关键信息 4. 格式转换""", model=model, formatter=formatter, toolkit=toolkit, memory=UserIsolatedMemory(user_id=user_id), max_iters=8, ) return agent