"""Built-in tools — file operations, shell, web fetch.""" from __future__ import annotations import asyncio import os import re from pathlib import Path from typing import Any import httpx from xtrm_agent.tools.registry import Tool # Patterns blocked in bash commands BASH_DENY_PATTERNS = [ r"rm\s+-rf\s+/", r"mkfs\.", r"\bdd\b.*of=/dev/", r":\(\)\{.*\|.*&\s*\};:", # fork bomb r"chmod\s+-R\s+777\s+/", r">\s*/dev/sd[a-z]", ] def _resolve_path(workspace: Path, requested: str) -> Path: """Resolve and sandbox a path to the workspace.""" p = (workspace / requested).resolve() ws = workspace.resolve() if not str(p).startswith(str(ws)): raise ValueError(f"Path '{requested}' escapes workspace") return p class ReadFileTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "read_file" @property def description(self) -> str: return "Read the contents of a file." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": {"type": "string", "description": "File path relative to workspace"}, }, "required": ["path"], } async def execute(self, path: str, **_: Any) -> str: p = _resolve_path(self._workspace, path) if not p.exists(): return f"Error: File not found: {path}" return p.read_text(errors="replace") class WriteFileTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "write_file" @property def description(self) -> str: return "Create or overwrite a file with the given content." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": {"type": "string", "description": "File path relative to workspace"}, "content": {"type": "string", "description": "Content to write"}, }, "required": ["path", "content"], } async def execute(self, path: str, content: str, **_: Any) -> str: p = _resolve_path(self._workspace, path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) return f"Wrote {len(content)} bytes to {path}" class EditFileTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "edit_file" @property def description(self) -> str: return "Replace an exact string in a file with new content." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": {"type": "string", "description": "File path relative to workspace"}, "old_string": {"type": "string", "description": "Exact text to find"}, "new_string": {"type": "string", "description": "Replacement text"}, }, "required": ["path", "old_string", "new_string"], } async def execute(self, path: str, old_string: str, new_string: str, **_: Any) -> str: p = _resolve_path(self._workspace, path) if not p.exists(): return f"Error: File not found: {path}" text = p.read_text() if old_string not in text: return "Error: old_string not found in file" count = text.count(old_string) if count > 1: return f"Error: old_string found {count} times — must be unique" p.write_text(text.replace(old_string, new_string, 1)) return f"Edited {path}" class ListDirTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "list_dir" @property def description(self) -> str: return "List files and directories at the given path." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": { "type": "string", "description": "Directory path relative to workspace (default: root)", "default": ".", }, }, } async def execute(self, path: str = ".", **_: Any) -> str: p = _resolve_path(self._workspace, path) if not p.exists(): return f"Error: Directory not found: {path}" if not p.is_dir(): return f"Error: Not a directory: {path}" entries = sorted(p.iterdir()) lines = [] for entry in entries: suffix = "/" if entry.is_dir() else "" lines.append(f"{entry.name}{suffix}") return "\n".join(lines) if lines else "(empty directory)" class BashTool(Tool): def __init__(self, workspace: Path, timeout: int = 60) -> None: self._workspace = workspace self._timeout = timeout @property def name(self) -> str: return "bash" @property def description(self) -> str: return "Execute a shell command in the workspace directory." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "command": {"type": "string", "description": "Shell command to execute"}, }, "required": ["command"], } async def execute(self, command: str, **_: Any) -> str: # Check deny patterns for pattern in BASH_DENY_PATTERNS: if re.search(pattern, command): return f"Error: Command blocked by security policy" try: proc = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=str(self._workspace), env={**os.environ}, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=self._timeout) output = stdout.decode(errors="replace") # Truncate large output if len(output) > 10_000: output = output[:10_000] + "\n... (truncated)" exit_info = f"\n[exit code: {proc.returncode}]" return output + exit_info except asyncio.TimeoutError: return f"Error: Command timed out after {self._timeout}s" def _strip_html(html: str) -> str: """Strip HTML tags and collapse whitespace to get readable text.""" # Remove script and style blocks text = re.sub(r"<(script|style)[^>]*>.*?", "", html, flags=re.DOTALL | re.IGNORECASE) # Replace
,

,

,
  • etc. with newlines text = re.sub(r"<(br|p|div|li|h[1-6]|tr)[^>]*/?>", "\n", text, flags=re.IGNORECASE) # Strip remaining tags text = re.sub(r"<[^>]+>", "", text) # Decode common HTML entities text = text.replace("&", "&").replace("<", "<").replace(">", ">") text = text.replace(""", '"').replace("'", "'").replace(" ", " ") # Collapse whitespace text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() _WEB_USER_AGENT = "Mozilla/5.0 (compatible; XtrmAgent/1.0; +https://github.com)" class WebFetchTool(Tool): @property def name(self) -> str: return "web_fetch" @property def description(self) -> str: return "Fetch the content of a URL and return it as readable text." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "url": {"type": "string", "description": "URL to fetch"}, }, "required": ["url"], } async def execute(self, url: str, **_: Any) -> str: try: async with httpx.AsyncClient( timeout=30, follow_redirects=True, headers={"User-Agent": _WEB_USER_AGENT} ) as client: resp = await client.get(url) content_type = resp.headers.get("content-type", "") text = resp.text if "html" in content_type: text = _strip_html(text) if len(text) > 20_000: text = text[:20_000] + "\n... (truncated)" return text except Exception as e: return f"Error fetching URL: {e}" class WebSearchTool(Tool): @property def name(self) -> str: return "web_search" @property def description(self) -> str: return "Search the web using DuckDuckGo and return a list of results with title, URL, and snippet." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "query": {"type": "string", "description": "Search query"}, "max_results": { "type": "integer", "description": "Maximum number of results (default: 5)", "default": 5, }, }, "required": ["query"], } async def execute(self, query: str, max_results: int = 5, **_: Any) -> str: try: from duckduckgo_search import AsyncDDGS async with AsyncDDGS() as ddgs: results = await ddgs.atext(query, max_results=max_results) if not results: return "No results found." lines: list[str] = [] for r in results: lines.append(f"**{r.get('title', '')}**") lines.append(r.get("href", "")) lines.append(r.get("body", "")) lines.append("---") return "\n".join(lines) except Exception as e: return f"Error searching: {e}" def register_builtin_tools(registry: Any, workspace: Path) -> None: """Register all built-in tools into a ToolRegistry.""" registry.register(ReadFileTool(workspace)) registry.register(WriteFileTool(workspace)) registry.register(EditFileTool(workspace)) registry.register(ListDirTool(workspace)) registry.register(BashTool(workspace)) registry.register(WebFetchTool()) registry.register(WebSearchTool())