Files
xtrm-agent/xtrm_agent/engine.py
Kaloyan Danchev b3608b35fa Connect to Discord, switch to NVIDIA NIM providers
- Enable Discord channel, respond to all messages from allowed users
- Switch all agents to NVIDIA NIM (Kimi K2.5, DeepSeek V3.1)
- Auto-approve all tools for non-interactive Docker deployment
- Fix tool call arguments serialization (dict → JSON string)
- Fix Dockerfile to copy source before uv sync

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 15:34:15 +02:00

110 lines
3.9 KiB
Python

"""Single agent engine — one LLM loop per agent."""
from __future__ import annotations
from typing import Any
from loguru import logger
from xtrm_agent.config import AgentFileConfig
import json
from xtrm_agent.llm.provider import LLMProvider, LLMResponse
from xtrm_agent.tools.approval import ApprovalEngine
from xtrm_agent.tools.registry import ToolRegistry
class Engine:
"""Runs one agent's LLM loop: messages → LLM → tool calls → loop → response."""
def __init__(
self,
agent_config: AgentFileConfig,
provider: LLMProvider,
tools: ToolRegistry,
approval: ApprovalEngine,
) -> None:
self.config = agent_config
self.provider = provider
self.tools = tools
self.approval = approval
async def run(self, user_message: str) -> str:
"""Process a single user message through the agent loop."""
messages = self._build_initial_messages(user_message)
return await self._agent_loop(messages)
async def run_delegation(self, task: str) -> str:
"""Process a delegation task (no system prompt changes)."""
messages = self._build_initial_messages(task)
return await self._agent_loop(messages)
def _build_initial_messages(self, user_message: str) -> list[dict[str, Any]]:
messages: list[dict[str, Any]] = []
if self.config.instructions:
messages.append({"role": "system", "content": self.config.instructions})
messages.append({"role": "user", "content": user_message})
return messages
async def _agent_loop(self, messages: list[dict[str, Any]]) -> str:
"""Core agent iteration loop."""
for iteration in range(self.config.max_iterations):
model = self.config.model or self.provider.get_default_model()
tool_defs = self.tools.get_definitions() if self.tools.names() else None
response = await self.provider.complete(
messages=messages,
tools=tool_defs,
model=model,
max_tokens=8192,
temperature=self.config.temperature,
)
if not response.has_tool_calls:
return response.content or "(no response)"
# Add assistant message with tool calls
messages.append(self._assistant_message(response))
# Execute each tool call
for tc in response.tool_calls:
result = await self._execute_tool(tc.name, tc.arguments)
messages.append(
{
"role": "tool",
"tool_call_id": tc.id,
"name": tc.name,
"content": result,
}
)
logger.debug(
f"[{self.config.name}] Iteration {iteration + 1}: "
f"{len(response.tool_calls)} tool call(s)"
)
return "(max iterations reached)"
async def _execute_tool(self, name: str, arguments: dict[str, Any]) -> str:
"""Execute a tool with approval check."""
approved = await self.approval.check(name, arguments)
if not approved:
return f"Tool '{name}' was denied by approval policy."
return await self.tools.execute(name, arguments)
def _assistant_message(self, response: LLMResponse) -> dict[str, Any]:
"""Build assistant message dict from LLMResponse."""
msg: dict[str, Any] = {"role": "assistant"}
if response.content:
msg["content"] = response.content
if response.tool_calls:
msg["tool_calls"] = [
{
"id": tc.id,
"type": "function",
"function": {"name": tc.name, "arguments": json.dumps(tc.arguments)},
}
for tc in response.tool_calls
]
return msg