"""MCP client — connect to MCP servers and wrap their tools.""" from __future__ import annotations from contextlib import AsyncExitStack from typing import Any from loguru import logger from xtrm_agent.config import MCPServerConfig from xtrm_agent.tools.registry import Tool, ToolRegistry class MCPToolWrapper(Tool): """Wraps an MCP server tool as a local Tool.""" def __init__(self, session: Any, server_name: str, tool_def: Any) -> None: self._session = session self._server_name = server_name self._tool_def = tool_def self._tool_name = f"mcp_{server_name}_{tool_def.name}" self._original_name = tool_def.name @property def name(self) -> str: return self._tool_name @property def description(self) -> str: return getattr(self._tool_def, "description", "") or "" @property def parameters(self) -> dict[str, Any]: schema = getattr(self._tool_def, "inputSchema", None) if schema: return dict(schema) return {"type": "object", "properties": {}} async def execute(self, **kwargs: Any) -> str: try: result = await self._session.call_tool(self._original_name, arguments=kwargs) parts = [] for block in result.content: if hasattr(block, "text"): parts.append(block.text) return "\n".join(parts) if parts else "(empty result)" except Exception as e: return f"Error calling MCP tool '{self._original_name}': {e}" async def connect_mcp_servers( mcp_servers: dict[str, MCPServerConfig], registry: ToolRegistry, stack: AsyncExitStack, ) -> None: """Connect to configured MCP servers and register their tools.""" if not mcp_servers: return try: from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client except ImportError: logger.warning("MCP SDK not available — skipping MCP server connections") return for name, cfg in mcp_servers.items(): try: if cfg.command: params = StdioServerParameters( command=cfg.command, args=cfg.args, env={**cfg.env} if cfg.env else None, ) read, write = await stack.enter_async_context(stdio_client(params)) elif cfg.url: try: from mcp.client.streamable_http import streamable_http_client read, write, _ = await stack.enter_async_context( streamable_http_client(cfg.url) ) except ImportError: logger.warning(f"MCP HTTP client not available — skipping {name}") continue else: logger.warning(f"MCP server '{name}' has no command or URL — skipping") continue session = await stack.enter_async_context(ClientSession(read, write)) await session.initialize() tools_result = await session.list_tools() for tool_def in tools_result.tools: wrapper = MCPToolWrapper(session, name, tool_def) registry.register(wrapper) logger.info(f"Registered MCP tool: {wrapper.name}") except Exception as e: logger.error(f"Failed to connect MCP server '{name}': {e}")