import httpx import json class CustomToolExecutor: def __init__(self, tool_def: dict): self.endpoint_url = tool_def.get("endpoint_url", "") self.method = tool_def.get("method", "GET") self.path = tool_def.get("path", "") self.headers = dict(tool_def.get("headers_json", {})) self.auth_type = tool_def.get("auth_type", "none") self.auth_config = dict(tool_def.get("auth_config", {})) self.timeout = int(tool_def.get("timeout", 30)) async def execute(self, params: dict) -> str: url = f"{self.endpoint_url.rstrip('/')}/{self.path.lstrip('/')}" headers = dict(self.headers) req_params = dict(params) if self.auth_type == "api_key": key = self.auth_config.get("key", "") loc = self.auth_config.get("location", "header") name = self.auth_config.get("name", "X-API-Key") if loc == "header": headers[name] = key else: req_params[name] = key elif self.auth_type == "bearer": headers["Authorization"] = f"Bearer {self.auth_config.get('token', '')}" timeout = httpx.Timeout(self.timeout) async with httpx.AsyncClient(timeout=timeout) as client: if self.method == "GET": resp = await client.get(url, params=req_params, headers=headers) else: resp = await client.request( self.method, url, json=req_params, headers=headers ) try: data = resp.json() return json.dumps(data, ensure_ascii=False, indent=2)[:4000] except Exception: return resp.text[:4000]