Files
xtrm-agent/xtrm_agent/cache.py
Kaloyan Danchev 872ed24f0c Add performance features: caching, cost tracking, retry, compaction, classification, scrubbing
Inspired by zeroclaw's lightweight patterns for slow hardware:
- Response cache (SQLite + SHA-256 keyed) to skip redundant LLM calls
- History compaction — LLM-summarize old messages when history exceeds 50
- Query classifier routes simple/research queries to cheaper models
- Credential scrubbing removes secrets from tool output before sending to LLM
- Cost tracker with daily/monthly budget enforcement (SQLite)
- Resilient provider with retry + exponential backoff + fallback provider
- Approval engine gains session "always allow" and audit log

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-19 09:20:52 +02:00

101 lines
3.4 KiB
Python

"""Response cache — avoid redundant LLM calls for identical prompts."""
from __future__ import annotations
import hashlib
import json
import time
from pathlib import Path
import aiosqlite
from loguru import logger
class ResponseCache:
"""SQLite-backed LLM response cache with TTL expiry."""
def __init__(self, db_path: str | Path = "data/cache.db", ttl: int = 3600) -> None:
self._db_path = str(db_path)
self._ttl = ttl
self._db: aiosqlite.Connection | None = None
async def setup(self) -> None:
"""Create the cache table if it doesn't exist."""
Path(self._db_path).parent.mkdir(parents=True, exist_ok=True)
self._db = await aiosqlite.connect(self._db_path)
await self._db.execute("PRAGMA journal_mode=WAL")
await self._db.execute(
"""
CREATE TABLE IF NOT EXISTS response_cache (
key TEXT PRIMARY KEY,
response TEXT NOT NULL,
model TEXT NOT NULL,
created_at REAL NOT NULL,
hits INTEGER DEFAULT 0
)
"""
)
await self._db.commit()
async def close(self) -> None:
if self._db:
await self._db.close()
self._db = None
@staticmethod
def _make_key(model: str, messages: list[dict]) -> str:
"""SHA-256 hash of model + messages for cache key."""
raw = json.dumps({"model": model, "messages": messages}, sort_keys=True)
return hashlib.sha256(raw.encode()).hexdigest()
async def get(self, model: str, messages: list[dict]) -> str | None:
"""Look up a cached response. Returns None on miss or expired."""
if not self._db:
return None
key = self._make_key(model, messages)
now = time.time()
async with self._db.execute(
"SELECT response, created_at FROM response_cache WHERE key = ?",
(key,),
) as cursor:
row = await cursor.fetchone()
if not row:
return None
response, created_at = row
if now - created_at > self._ttl:
await self._db.execute("DELETE FROM response_cache WHERE key = ?", (key,))
await self._db.commit()
return None
# Bump hit count
await self._db.execute(
"UPDATE response_cache SET hits = hits + 1 WHERE key = ?", (key,)
)
await self._db.commit()
logger.debug(f"Cache hit for {model} (key={key[:12]}...)")
return response
async def put(self, model: str, messages: list[dict], response: str) -> None:
"""Store a response in the cache."""
if not self._db:
return
key = self._make_key(model, messages)
await self._db.execute(
"""
INSERT OR REPLACE INTO response_cache (key, response, model, created_at, hits)
VALUES (?, ?, ?, ?, 0)
""",
(key, response, model, time.time()),
)
await self._db.commit()
async def clear_expired(self) -> int:
"""Remove expired entries. Returns count of deleted rows."""
if not self._db:
return 0
cutoff = time.time() - self._ttl
cursor = await self._db.execute(
"DELETE FROM response_cache WHERE created_at < ?", (cutoff,)
)
await self._db.commit()
return cursor.rowcount