Files
xtrm-agent/xtrm_agent/tools/builtin.py
Kaloyan Danchev e24e3026b6 Add Discord attachment reading and web search capabilities
- Discord channel now downloads and extracts text from attachments (text files, PDFs)
- Added WebSearchTool using DuckDuckGo for researcher and coder agents
- Improved WebFetchTool with User-Agent header and HTML-to-text stripping
- Added pypdf and duckduckgo-search dependencies

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 18:07:19 +02:00

329 lines
10 KiB
Python

"""Built-in tools — file operations, shell, web fetch."""
from __future__ import annotations
import asyncio
import os
import re
from pathlib import Path
from typing import Any
import httpx
from xtrm_agent.tools.registry import Tool
# Patterns blocked in bash commands
BASH_DENY_PATTERNS = [
r"rm\s+-rf\s+/",
r"mkfs\.",
r"\bdd\b.*of=/dev/",
r":\(\)\{.*\|.*&\s*\};:", # fork bomb
r"chmod\s+-R\s+777\s+/",
r">\s*/dev/sd[a-z]",
]
def _resolve_path(workspace: Path, requested: str) -> Path:
"""Resolve and sandbox a path to the workspace."""
p = (workspace / requested).resolve()
ws = workspace.resolve()
if not str(p).startswith(str(ws)):
raise ValueError(f"Path '{requested}' escapes workspace")
return p
class ReadFileTool(Tool):
def __init__(self, workspace: Path) -> None:
self._workspace = workspace
@property
def name(self) -> str:
return "read_file"
@property
def description(self) -> str:
return "Read the contents of a file."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path relative to workspace"},
},
"required": ["path"],
}
async def execute(self, path: str, **_: Any) -> str:
p = _resolve_path(self._workspace, path)
if not p.exists():
return f"Error: File not found: {path}"
return p.read_text(errors="replace")
class WriteFileTool(Tool):
def __init__(self, workspace: Path) -> None:
self._workspace = workspace
@property
def name(self) -> str:
return "write_file"
@property
def description(self) -> str:
return "Create or overwrite a file with the given content."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path relative to workspace"},
"content": {"type": "string", "description": "Content to write"},
},
"required": ["path", "content"],
}
async def execute(self, path: str, content: str, **_: Any) -> str:
p = _resolve_path(self._workspace, path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
return f"Wrote {len(content)} bytes to {path}"
class EditFileTool(Tool):
def __init__(self, workspace: Path) -> None:
self._workspace = workspace
@property
def name(self) -> str:
return "edit_file"
@property
def description(self) -> str:
return "Replace an exact string in a file with new content."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path relative to workspace"},
"old_string": {"type": "string", "description": "Exact text to find"},
"new_string": {"type": "string", "description": "Replacement text"},
},
"required": ["path", "old_string", "new_string"],
}
async def execute(self, path: str, old_string: str, new_string: str, **_: Any) -> str:
p = _resolve_path(self._workspace, path)
if not p.exists():
return f"Error: File not found: {path}"
text = p.read_text()
if old_string not in text:
return "Error: old_string not found in file"
count = text.count(old_string)
if count > 1:
return f"Error: old_string found {count} times — must be unique"
p.write_text(text.replace(old_string, new_string, 1))
return f"Edited {path}"
class ListDirTool(Tool):
def __init__(self, workspace: Path) -> None:
self._workspace = workspace
@property
def name(self) -> str:
return "list_dir"
@property
def description(self) -> str:
return "List files and directories at the given path."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Directory path relative to workspace (default: root)",
"default": ".",
},
},
}
async def execute(self, path: str = ".", **_: Any) -> str:
p = _resolve_path(self._workspace, path)
if not p.exists():
return f"Error: Directory not found: {path}"
if not p.is_dir():
return f"Error: Not a directory: {path}"
entries = sorted(p.iterdir())
lines = []
for entry in entries:
suffix = "/" if entry.is_dir() else ""
lines.append(f"{entry.name}{suffix}")
return "\n".join(lines) if lines else "(empty directory)"
class BashTool(Tool):
def __init__(self, workspace: Path, timeout: int = 60) -> None:
self._workspace = workspace
self._timeout = timeout
@property
def name(self) -> str:
return "bash"
@property
def description(self) -> str:
return "Execute a shell command in the workspace directory."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command to execute"},
},
"required": ["command"],
}
async def execute(self, command: str, **_: Any) -> str:
# Check deny patterns
for pattern in BASH_DENY_PATTERNS:
if re.search(pattern, command):
return f"Error: Command blocked by security policy"
try:
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=str(self._workspace),
env={**os.environ},
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=self._timeout)
output = stdout.decode(errors="replace")
# Truncate large output
if len(output) > 10_000:
output = output[:10_000] + "\n... (truncated)"
exit_info = f"\n[exit code: {proc.returncode}]"
return output + exit_info
except asyncio.TimeoutError:
return f"Error: Command timed out after {self._timeout}s"
def _strip_html(html: str) -> str:
"""Strip HTML tags and collapse whitespace to get readable text."""
# Remove script and style blocks
text = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
# Replace <br>, <p>, <div>, <li> etc. with newlines
text = re.sub(r"<(br|p|div|li|h[1-6]|tr)[^>]*/?>", "\n", text, flags=re.IGNORECASE)
# Strip remaining tags
text = re.sub(r"<[^>]+>", "", text)
# Decode common HTML entities
text = text.replace("&amp;", "&").replace("&lt;", "<").replace("&gt;", ">")
text = text.replace("&quot;", '"').replace("&#39;", "'").replace("&nbsp;", " ")
# Collapse whitespace
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
_WEB_USER_AGENT = "Mozilla/5.0 (compatible; XtrmAgent/1.0; +https://github.com)"
class WebFetchTool(Tool):
@property
def name(self) -> str:
return "web_fetch"
@property
def description(self) -> str:
return "Fetch the content of a URL and return it as readable text."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"url": {"type": "string", "description": "URL to fetch"},
},
"required": ["url"],
}
async def execute(self, url: str, **_: Any) -> str:
try:
async with httpx.AsyncClient(
timeout=30, follow_redirects=True, headers={"User-Agent": _WEB_USER_AGENT}
) as client:
resp = await client.get(url)
content_type = resp.headers.get("content-type", "")
text = resp.text
if "html" in content_type:
text = _strip_html(text)
if len(text) > 20_000:
text = text[:20_000] + "\n... (truncated)"
return text
except Exception as e:
return f"Error fetching URL: {e}"
class WebSearchTool(Tool):
@property
def name(self) -> str:
return "web_search"
@property
def description(self) -> str:
return "Search the web using DuckDuckGo and return a list of results with title, URL, and snippet."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"},
"max_results": {
"type": "integer",
"description": "Maximum number of results (default: 5)",
"default": 5,
},
},
"required": ["query"],
}
async def execute(self, query: str, max_results: int = 5, **_: Any) -> str:
try:
from duckduckgo_search import AsyncDDGS
async with AsyncDDGS() as ddgs:
results = await ddgs.atext(query, max_results=max_results)
if not results:
return "No results found."
lines: list[str] = []
for r in results:
lines.append(f"**{r.get('title', '')}**")
lines.append(r.get("href", ""))
lines.append(r.get("body", ""))
lines.append("---")
return "\n".join(lines)
except Exception as e:
return f"Error searching: {e}"
def register_builtin_tools(registry: Any, workspace: Path) -> None:
"""Register all built-in tools into a ToolRegistry."""
registry.register(ReadFileTool(workspace))
registry.register(WriteFileTool(workspace))
registry.register(EditFileTool(workspace))
registry.register(ListDirTool(workspace))
registry.register(BashTool(workspace))
registry.register(WebFetchTool())
registry.register(WebSearchTool())