"""Delegate tool — allows agents to invoke each other.""" from __future__ import annotations import asyncio import uuid from typing import Any from loguru import logger from xtrm_agent.bus import AgentMessage, MessageBus from xtrm_agent.tools.registry import Tool class DelegateTool(Tool): """Built-in tool for inter-agent delegation.""" def __init__( self, bus: MessageBus, from_agent: str, available_agents: list[str], timeout: int = 120, ) -> None: self._bus = bus self._from_agent = from_agent self._available_agents = available_agents self._timeout = timeout self._pending: dict[str, asyncio.Future[str]] = {} @property def name(self) -> str: return "delegate" @property def description(self) -> str: agents = ", ".join(self._available_agents) return f"Delegate a task to another agent. Available agents: {agents}" @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "agent_name": { "type": "string", "description": "Name of the agent to delegate to", }, "task": { "type": "string", "description": "Description of the task to delegate", }, }, "required": ["agent_name", "task"], } async def execute(self, agent_name: str, task: str, **_: Any) -> str: if agent_name not in self._available_agents: return f"Error: Unknown agent '{agent_name}'. Available: {', '.join(self._available_agents)}" if agent_name == self._from_agent: return "Error: Cannot delegate to self" request_id = uuid.uuid4().hex[:12] future: asyncio.Future[str] = asyncio.get_event_loop().create_future() self._pending[request_id] = future msg = AgentMessage( from_agent=self._from_agent, to_agent=agent_name, task=task, request_id=request_id, ) await self._bus.publish_agent_message(msg) logger.info(f"[{self._from_agent}] Delegated to {agent_name}: {task[:80]}") try: result = await asyncio.wait_for(future, timeout=self._timeout) return result except asyncio.TimeoutError: self._pending.pop(request_id, None) return f"Error: Delegation to '{agent_name}' timed out after {self._timeout}s" def resolve(self, request_id: str, response: str) -> None: """Resolve a pending delegation with the response.""" future = self._pending.pop(request_id, None) if future and not future.done(): future.set_result(response)