"""Interactive CLI channel — REPL with prompt_toolkit + rich.""" from __future__ import annotations import asyncio from loguru import logger from prompt_toolkit import PromptSession from prompt_toolkit.patch_stdout import patch_stdout from rich.console import Console from rich.markdown import Markdown from xtrm_agent.bus import InboundMessage, MessageBus, OutboundMessage from xtrm_agent.channels.base import BaseChannel class CLIChannel(BaseChannel): """Interactive REPL channel.""" def __init__( self, bus: MessageBus, default_agent: str = "coder", ) -> None: super().__init__(bus) self.default_agent = default_agent self.console = Console() self._running = False self._outbound_queue = bus.subscribe_outbound("cli") async def start(self) -> None: """Run the interactive REPL.""" self._running = True session: PromptSession[str] = PromptSession() self.console.print("[bold]xtrm-agent[/bold] — type a message or @agent_name to target an agent") self.console.print("Type [bold]/quit[/bold] to exit\n") # Start output listener output_task = asyncio.create_task(self._output_loop()) try: while self._running: try: with patch_stdout(): user_input = await session.prompt_async("you> ") except (EOFError, KeyboardInterrupt): break text = user_input.strip() if not text: continue if text.lower() in ("/quit", "/exit"): break msg = InboundMessage( channel="cli", sender_id="user", chat_id="cli", content=text, ) await self.bus.publish_inbound(msg) # Wait for the response try: out_msg = await asyncio.wait_for(self._outbound_queue.get(), timeout=300) self._render_response(out_msg) except asyncio.TimeoutError: self.console.print("[red]Timed out waiting for response[/red]") finally: self._running = False output_task.cancel() async def _output_loop(self) -> None: """Background task to handle unsolicited outbound messages.""" # This handles messages that arrive outside the normal request/response flow # (e.g., delegation results, notifications) pass def _render_response(self, msg: OutboundMessage) -> None: """Render agent response with rich markdown.""" self.console.print() self.console.print(Markdown(msg.content)) self.console.print() async def stop(self) -> None: self._running = False async def run_single_message( bus: MessageBus, message: str, agent: str | None = None, outbound_queue: asyncio.Queue[OutboundMessage] | None = None, ) -> str: """Send a single message and wait for the response.""" if outbound_queue is None: outbound_queue = bus.subscribe_outbound("cli") msg = InboundMessage( channel="cli", sender_id="user", chat_id="cli", content=message, target_agent=agent, ) await bus.publish_inbound(msg) out = await asyncio.wait_for(outbound_queue.get(), timeout=300) return out.content