"""Tool approval engine — deny-by-default, per-agent policies.""" from __future__ import annotations import time from enum import Enum from typing import Any from loguru import logger class ApprovalPolicy(Enum): AUTO_APPROVE = "auto_approve" REQUIRE_APPROVAL = "require_approval" DENY = "deny" class ApprovalEngine: """Deny-by-default tool approval with session allowlist and audit log.""" def __init__( self, auto_approve: list[str] | None = None, require_approval: list[str] | None = None, interactive: bool = True, ) -> None: self._auto_approve = set(auto_approve or []) self._require_approval = set(require_approval or []) self._interactive = interactive self._session_allowed: set[str] = set() self._audit_log: list[dict[str, Any]] = [] def get_policy(self, tool_name: str) -> ApprovalPolicy: """Get the approval policy for a tool.""" # MCP tools inherit from the mcp_* prefix pattern base_name = tool_name.split("_", 1)[0] if tool_name.startswith("mcp_") else tool_name if tool_name in self._auto_approve or base_name in self._auto_approve: return ApprovalPolicy.AUTO_APPROVE if tool_name in self._require_approval or base_name in self._require_approval: return ApprovalPolicy.REQUIRE_APPROVAL return ApprovalPolicy.DENY async def check(self, tool_name: str, arguments: dict[str, Any]) -> bool: """Check if a tool call is approved. Returns True if approved.""" policy = self.get_policy(tool_name) if policy == ApprovalPolicy.AUTO_APPROVE: self._log_decision(tool_name, arguments, "auto_approved") return True # Session-scoped "always allow" if tool_name in self._session_allowed: self._log_decision(tool_name, arguments, "session_allowed") return True if policy == ApprovalPolicy.DENY: logger.warning(f"Tool '{tool_name}' denied by policy") self._log_decision(tool_name, arguments, "denied") return False # REQUIRE_APPROVAL if not self._interactive: logger.warning(f"Tool '{tool_name}' requires approval but running non-interactively — denied") self._log_decision(tool_name, arguments, "denied_non_interactive") return False # In interactive mode, prompt the user logger.info(f"Tool '{tool_name}' requires approval. Args: {arguments}") approved, always = await self._prompt_user(tool_name, arguments) if approved and always: self._session_allowed.add(tool_name) self._log_decision(tool_name, arguments, "user_approved" if approved else "user_denied") return approved async def _prompt_user(self, tool_name: str, arguments: dict[str, Any]) -> tuple[bool, bool]: """Prompt user for tool approval. Returns (approved, always_allow).""" print(f"\n[APPROVAL REQUIRED] Tool: {tool_name}") print(f" Arguments: {arguments}") try: answer = input(" Allow? [y/N/a(lways)]: ").strip().lower() if answer in ("a", "always"): return True, True return answer in ("y", "yes"), False except (EOFError, KeyboardInterrupt): return False, False def _log_decision(self, tool_name: str, arguments: dict[str, Any], decision: str) -> None: """Record an approval decision in the audit log.""" self._audit_log.append({ "tool": tool_name, "arguments": arguments, "decision": decision, "timestamp": time.time(), }) def get_audit_log(self) -> list[dict[str, Any]]: """Return the audit log for inspection.""" return list(self._audit_log)