"""Built-in tools — file operations, shell, web fetch.""" from __future__ import annotations import asyncio import os import re from pathlib import Path from typing import Any import httpx from xtrm_agent.tools.registry import Tool # Patterns blocked in bash commands BASH_DENY_PATTERNS = [ r"rm\s+-rf\s+/", r"mkfs\.", r"\bdd\b.*of=/dev/", r":\(\)\{.*\|.*&\s*\};:", # fork bomb r"chmod\s+-R\s+777\s+/", r">\s*/dev/sd[a-z]", ] def _resolve_path(workspace: Path, requested: str) -> Path: """Resolve and sandbox a path to the workspace.""" p = (workspace / requested).resolve() ws = workspace.resolve() if not str(p).startswith(str(ws)): raise ValueError(f"Path '{requested}' escapes workspace") return p class ReadFileTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "read_file" @property def description(self) -> str: return "Read the contents of a file." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": {"type": "string", "description": "File path relative to workspace"}, }, "required": ["path"], } async def execute(self, path: str, **_: Any) -> str: p = _resolve_path(self._workspace, path) if not p.exists(): return f"Error: File not found: {path}" return p.read_text(errors="replace") class WriteFileTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "write_file" @property def description(self) -> str: return "Create or overwrite a file with the given content." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": {"type": "string", "description": "File path relative to workspace"}, "content": {"type": "string", "description": "Content to write"}, }, "required": ["path", "content"], } async def execute(self, path: str, content: str, **_: Any) -> str: p = _resolve_path(self._workspace, path) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) return f"Wrote {len(content)} bytes to {path}" class EditFileTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "edit_file" @property def description(self) -> str: return "Replace an exact string in a file with new content." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": {"type": "string", "description": "File path relative to workspace"}, "old_string": {"type": "string", "description": "Exact text to find"}, "new_string": {"type": "string", "description": "Replacement text"}, }, "required": ["path", "old_string", "new_string"], } async def execute(self, path: str, old_string: str, new_string: str, **_: Any) -> str: p = _resolve_path(self._workspace, path) if not p.exists(): return f"Error: File not found: {path}" text = p.read_text() if old_string not in text: return "Error: old_string not found in file" count = text.count(old_string) if count > 1: return f"Error: old_string found {count} times — must be unique" p.write_text(text.replace(old_string, new_string, 1)) return f"Edited {path}" class ListDirTool(Tool): def __init__(self, workspace: Path) -> None: self._workspace = workspace @property def name(self) -> str: return "list_dir" @property def description(self) -> str: return "List files and directories at the given path." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "path": { "type": "string", "description": "Directory path relative to workspace (default: root)", "default": ".", }, }, } async def execute(self, path: str = ".", **_: Any) -> str: p = _resolve_path(self._workspace, path) if not p.exists(): return f"Error: Directory not found: {path}" if not p.is_dir(): return f"Error: Not a directory: {path}" entries = sorted(p.iterdir()) lines = [] for entry in entries: suffix = "/" if entry.is_dir() else "" lines.append(f"{entry.name}{suffix}") return "\n".join(lines) if lines else "(empty directory)" class BashTool(Tool): def __init__(self, workspace: Path, timeout: int = 60) -> None: self._workspace = workspace self._timeout = timeout @property def name(self) -> str: return "bash" @property def description(self) -> str: return "Execute a shell command in the workspace directory." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "command": {"type": "string", "description": "Shell command to execute"}, }, "required": ["command"], } async def execute(self, command: str, **_: Any) -> str: # Check deny patterns for pattern in BASH_DENY_PATTERNS: if re.search(pattern, command): return f"Error: Command blocked by security policy" try: proc = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=str(self._workspace), env={**os.environ}, ) stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=self._timeout) output = stdout.decode(errors="replace") # Truncate large output if len(output) > 10_000: output = output[:10_000] + "\n... (truncated)" exit_info = f"\n[exit code: {proc.returncode}]" return output + exit_info except asyncio.TimeoutError: return f"Error: Command timed out after {self._timeout}s" class WebFetchTool(Tool): @property def name(self) -> str: return "web_fetch" @property def description(self) -> str: return "Fetch the content of a URL." @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "url": {"type": "string", "description": "URL to fetch"}, }, "required": ["url"], } async def execute(self, url: str, **_: Any) -> str: try: async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client: resp = await client.get(url) text = resp.text if len(text) > 20_000: text = text[:20_000] + "\n... (truncated)" return text except Exception as e: return f"Error fetching URL: {e}" def register_builtin_tools(registry: Any, workspace: Path) -> None: """Register all built-in tools into a ToolRegistry.""" registry.register(ReadFileTool(workspace)) registry.register(WriteFileTool(workspace)) registry.register(EditFileTool(workspace)) registry.register(ListDirTool(workspace)) registry.register(BashTool(workspace)) registry.register(WebFetchTool())