from agentscope.agent import AgentBase from agentscope.message import Msg def create_rbac_pre_reply_hook(user_context: dict): async def rbac_pre_reply_hook(self: AgentBase, kwargs: dict) -> dict: msg = kwargs.get("msg") if msg and isinstance(msg, Msg): msg.metadata = msg.metadata or {} msg.metadata["_user_id"] = user_context["user_id"] msg.metadata["_role"] = user_context.get("role", "employee") msg.metadata["_department_id"] = user_context.get("department_id", "") msg.metadata["_data_scope"] = user_context.get("data_scope", "self_only") return kwargs return rbac_pre_reply_hook def register_rbac_hooks_for_user(agent: AgentBase, user_context: dict): hook = create_rbac_pre_reply_hook(user_context) hook_name = f"rbac_{user_context['user_id']}" agent.register_instance_hook("pre_reply", hook_name, hook)