# TZZR Orchestrator v6 - Código Completo Sistema de orquestación multi-agente robusto y flexible. ## Índice 1. [Estructura del Proyecto](#estructura-del-proyecto) 2. [main.py](#mainpy) 3. [config.yaml](#configyaml) 4. [orchestrator/\_\_init\_\_.py](#orchestrator__init__py) 5. [orchestrator/config.py](#orchestratorconfigpy) 6. [orchestrator/core/\_\_init\_\_.py](#orchestratorcore__init__py) 7. [orchestrator/core/retry.py](#orchestratorcoreretrypy) 8. [orchestrator/core/circuit_breaker.py](#orchestratorcorecircuit_breakerpy) 9. [orchestrator/core/rate_limiter.py](#orchestratorcorerate_limiterpy) 10. [orchestrator/providers/\_\_init\_\_.py](#orchestratorproviders__init__py) 11. [orchestrator/providers/base.py](#orchestratorprovidersbasepy) 12. [orchestrator/providers/claude_provider.py](#orchestratorprovidersclaude_providerpy) 13. [orchestrator/providers/litellm_provider.py](#orchestratorproviderslitellm_providerpy) 14. [orchestrator/tools/\_\_init\_\_.py](#orchestratortools__init__py) 15. [orchestrator/tools/definitions.py](#orchestratortoolsdefinitionspy) 16. [orchestrator/tools/executor.py](#orchestratortoolsexecutorpy) 17. [orchestrator/agents/\_\_init\_\_.py](#orchestratoragents__init__py) 18. [orchestrator/agents/base.py](#orchestratoragentsbasepy) 19. [orchestrator/tracking/\_\_init\_\_.py](#orchestratortracking__init__py) 20. [Ejemplos de Configuración](#ejemplos-de-configuración) --- ## Estructura del Proyecto ``` tzzr_v6/ ├── main.py # Punto de entrada ├── config.yaml # Configuración principal ├── .env # Variables de entorno (crear desde .env.example) ├── .env.example # Ejemplo de variables ├── .gitignore ├── README.md ├── orchestrator/ │ ├── __init__.py │ ├── config.py # Gestión de configuración │ ├── core/ # Componentes de resiliencia │ │ ├── __init__.py │ │ ├── circuit_breaker.py │ │ ├── rate_limiter.py │ │ └── retry.py │ ├── providers/ # Proveedores de LLM │ │ ├── __init__.py │ │ ├── base.py │ │ ├── claude_provider.py │ │ └── litellm_provider.py │ ├── tools/ # Herramientas │ │ ├── __init__.py │ │ ├── definitions.py │ │ └── executor.py │ ├── agents/ # Agentes │ │ ├── __init__.py │ │ └── base.py │ └── tracking/ # Cost tracking │ └── __init__.py ├── logs/ # Logs por agente ├── outputs/ # Archivos generados └── examples/ # Ejemplos de configuración ├── simple.yaml ├── dev_team.yaml ├── devops.yaml └── local_ollama.yaml ``` --- ## main.py ```python #!/usr/bin/env python3 """ LLM Orchestrator - Multi-agent orchestration system. Usage: python main.py # Interactive mode python main.py --status # Show status python main.py --agents # List agents python main.py --agent X --prompt "Y" # Execute prompt python main.py --health # Health check """ import asyncio import argparse import json import sys from datetime import datetime from pathlib import Path # Add orchestrator to path sys.path.insert(0, str(Path(__file__).parent)) from orchestrator.config import get_config, reload_config from orchestrator.agents import Agent, AgentResult from orchestrator.tracking import CostTracker class Orchestrator: """Main orchestrator class.""" def __init__(self, config_path: str = None): print("🚀 Starting Orchestrator...") self.config = get_config(config_path) self.agents: dict[str, Agent] = {} self.cost_tracker = CostTracker() # Load agents for name, agent_config in self.config.agents.items(): try: self.agents[name] = Agent(config_obj=agent_config) except Exception as e: print(f"⚠️ Error creating agent {name}: {e}") if self.agents: print(f"✅ Loaded {len(self.agents)} agent(s): {', '.join(self.agents.keys())}") else: print("⚠️ No agents loaded. Edit config.yaml") async def run_agent(self, agent_name: str, prompt: str) -> AgentResult: """Run a single agent.""" if agent_name not in self.agents: return AgentResult( success=False, output="", agent_name=agent_name, error=f"Agent '{agent_name}' not found", ) result = await self.agents[agent_name].run(prompt) # Track costs if result.usage: self.cost_tracker.record( model=self.agents[agent_name].model, input_tokens=result.usage.get("input_tokens", 0), output_tokens=result.usage.get("output_tokens", 0), ) return result async def run_all(self, prompt: str) -> dict[str, AgentResult]: """Run all agents in parallel.""" tasks = { name: agent.run(prompt) for name, agent in self.agents.items() } results_list = await asyncio.gather(*tasks.values(), return_exceptions=True) results = {} for name, result in zip(tasks.keys(), results_list): if isinstance(result, Exception): results[name] = AgentResult( success=False, output="", agent_name=name, error=str(result), ) else: results[name] = result return results async def health_check(self) -> dict[str, bool]: """Check health of all agents.""" results = {} for name, agent in self.agents.items(): try: results[name] = await agent.health_check() except Exception: results[name] = False return results def get_status(self) -> dict: """Get orchestrator status.""" return { "config_path": str(self.config.config_path) if self.config.config_path else None, "settings": { "timeout": self.config.settings.timeout, "rate_limit": self.config.settings.rate_limit_per_minute, "max_retries": self.config.settings.max_retries, "sandbox_paths": self.config.settings.sandbox_paths, "circuit_breaker": self.config.settings.enable_circuit_breaker, }, "agents": { name: { "provider": agent.provider_name, "model": agent.model, "tools": agent.tools, } for name, agent in self.agents.items() }, "servers": list(self.config.servers.keys()), "costs": self.cost_tracker.summary.to_dict(), } async def interactive(self): """Run interactive mode.""" print("\n" + "=" * 60) print("LLM Orchestrator - Interactive Mode") print("=" * 60) print("\nCommands:") print(" /status - Show status") print(" /agents - List agents") print(" /agent - Switch active agent") print(" /logs - Show agent logs") print(" /costs - Show cost tracking") print(" /health - Health check") print(" /reload - Reload configuration") print(" /all - Run on all agents (parallel)") print(" /quit - Exit") print("-" * 60) if not self.agents: print("\n⚠️ No agents available. Edit config.yaml") return current_agent = list(self.agents.keys())[0] while True: try: prompt = input(f"\n[{current_agent}] > ").strip() if not prompt: continue if prompt == "/quit": print("Goodbye!") break elif prompt == "/status": print(json.dumps(self.get_status(), indent=2)) elif prompt == "/agents": for name, agent in self.agents.items(): marker = "→" if name == current_agent else " " print(f" {marker} {name}: {agent.provider_name}/{agent.model}") if agent.tools: print(f" tools: {', '.join(agent.tools)}") elif prompt.startswith("/agent "): name = prompt[7:].strip() if name in self.agents: current_agent = name print(f"Active agent: {current_agent}") else: print(f"Agent '{name}' not found. Available: {', '.join(self.agents.keys())}") elif prompt.startswith("/logs "): name = prompt[6:].strip() if name in self.agents: log = self.agents[name].read_log(last_n=5) print(log if log else "(empty)") else: print(f"Agent '{name}' not found") elif prompt == "/costs": summary = self.cost_tracker.summary print(f"\n📊 Cost Summary:") print(f" Requests: {summary.total_requests}") print(f" Tokens: {summary.total_input_tokens:,} in / {summary.total_output_tokens:,} out") print(f" Cost: ${summary.total_cost_usd:.4f}") if summary.by_model: print("\n By model:") for model, data in summary.by_model.items(): print(f" {model}: {data['requests']} req, ${data['cost_usd']:.4f}") elif prompt == "/health": print("Checking health...") results = await self.health_check() for name, healthy in results.items(): status = "✅" if healthy else "❌" print(f" {status} {name}") elif prompt == "/reload": self.config = reload_config() self.agents = {} for name, agent_config in self.config.agents.items(): try: self.agents[name] = Agent(config_obj=agent_config) except Exception as e: print(f"⚠️ Error: {name}: {e}") print(f"✅ Reloaded: {len(self.agents)} agent(s)") if self.agents: current_agent = list(self.agents.keys())[0] elif prompt == "/all": user_prompt = input("Prompt: ").strip() if user_prompt: print("Running on all agents...") results = await self.run_all(user_prompt) for name, result in results.items(): status = "✅" if result.success else "❌" output = result.output[:300] + "..." if len(result.output) > 300 else result.output print(f"\n{status} {name}:\n{output or result.error}") elif prompt.startswith("/"): print(f"Unknown command: {prompt}") else: print("Running...") result = await self.run_agent(current_agent, prompt) if result.success: print(f"\n{result.output}") if result.tool_results: print(f"\n[Tools used: {len(result.tool_results)}]") else: print(f"\n❌ {result.error}") except KeyboardInterrupt: print("\n\nUse /quit to exit.") except Exception as e: print(f"\n❌ Error: {e}") async def main(): parser = argparse.ArgumentParser(description="LLM Orchestrator") parser.add_argument("--config", type=str, help="Path to config.yaml") parser.add_argument("--status", action="store_true", help="Show status") parser.add_argument("--agents", action="store_true", help="List agents") parser.add_argument("--agent", type=str, help="Agent to use") parser.add_argument("--prompt", type=str, help="Prompt to execute") parser.add_argument("--health", action="store_true", help="Health check") args = parser.parse_args() orchestrator = Orchestrator(args.config) if args.status: print(json.dumps(orchestrator.get_status(), indent=2)) elif args.agents: for name, agent in orchestrator.agents.items(): print(f" {name}: {agent.provider_name}/{agent.model}") elif args.health: results = await orchestrator.health_check() for name, healthy in results.items(): status = "✅" if healthy else "❌" print(f" {status} {name}") elif args.agent and args.prompt: result = await orchestrator.run_agent(args.agent, args.prompt) if result.success: print(result.output) else: print(f"Error: {result.error}", file=sys.stderr) sys.exit(1) else: await orchestrator.interactive() if __name__ == "__main__": asyncio.run(main()) ``` --- ## config.yaml ```yaml # config.yaml - LLM Orchestrator Configuration # # Edit this file to define your agents and servers. # ============================================================================ # SETTINGS # ============================================================================ settings: # Default provider/model if not specified per agent default_provider: claude default_model: sonnet # Timeout in seconds timeout: 300 # Working directory (relative to this file) working_dir: . # Max tool iterations per turn max_tool_iterations: 10 # Security sandbox_paths: true # Restrict file access to working_dir ssh_strict_host_checking: true # Verify SSH hosts # allowed_commands: [] # Whitelist commands (empty = all allowed) # Rate limiting rate_limit_per_minute: 60 # Retry max_retries: 3 retry_delay: 1.0 # Circuit breaker (protects against cascading failures) enable_circuit_breaker: true circuit_breaker_threshold: 5 # Failures before opening circuit_breaker_timeout: 30.0 # Seconds before retry # ============================================================================ # SERVERS (optional) # ============================================================================ # Define servers for SSH access servers: # Example: # production: # host: 192.168.1.100 # user: deploy # key: ~/.ssh/id_rsa # port: 22 # description: "Production server" # ============================================================================ # AGENTS # ============================================================================ # Define your AI agents agents: # Default assistant assistant: role: | You are a helpful assistant that can execute commands, read and write files, and help with various tasks. provider: claude model: sonnet tools: - bash - read - write - list_dir - glob # Coder agent (uncomment to enable) # coder: # role: | # You are an expert programmer. # You write clean, well-documented, and testable code. # You follow best practices and include error handling. # provider: claude # model: sonnet # tools: # - read # - write # - bash # - grep # - glob # ============================================================================ # TASKS (optional) # ============================================================================ # Define automated task sequences tasks: # Example: # deploy: # description: "Deploy application to production" # steps: # - agent: coder # prompt: "Run tests" # - agent: deployer # prompt: "Deploy to production" ``` --- ## orchestrator/\_\_init\_\_.py ```python """ Orchestrator module. """ from .config import Config, get_config, reload_config, AgentConfig, ServerConfig, Settings from .agents import Agent, AgentResult __all__ = [ "Config", "get_config", "reload_config", "AgentConfig", "ServerConfig", "Settings", "Agent", "AgentResult", ] ``` --- ## orchestrator/config.py ```python """ Configuration management. Loads configuration from: 1. Environment variables 2. .env file 3. config.yaml NEVER hardcode credentials. """ import os from pathlib import Path from dataclasses import dataclass, field from typing import Optional, Dict, Any, List def load_env(): """Load variables from .env file if it exists.""" env_paths = [ Path.cwd() / ".env", Path(__file__).parent.parent / ".env", ] for env_file in env_paths: if env_file.exists(): with open(env_file) as f: for line in f: line = line.strip() if line and not line.startswith("#") and "=" in line: key, _, value = line.partition("=") key = key.strip() value = value.strip().strip('"').strip("'") if key and value: os.environ.setdefault(key, value) break # Load .env on import load_env() def get_env(key: str, default: str = "") -> str: """Get environment variable.""" return os.environ.get(key, default) def get_env_bool(key: str, default: bool = False) -> bool: """Get environment variable as boolean.""" val = os.environ.get(key, "").lower() if val in ("true", "yes", "1"): return True if val in ("false", "no", "0"): return False return default def get_env_list(key: str, default: Optional[List] = None) -> List: """Get environment variable as list.""" val = os.environ.get(key, "") if val: return [x.strip() for x in val.split(",") if x.strip()] return default or [] @dataclass class ServerConfig: """Server configuration for SSH.""" name: str host: str user: str = "root" key: str = "" port: int = 22 description: str = "" def __post_init__(self): if not self.key: self.key = get_env("SSH_KEY_PATH", "~/.ssh/id_rsa") @dataclass class AgentConfig: """Agent configuration.""" name: str role: str provider: str = "claude" model: str = "sonnet" tools: List[str] = field(default_factory=list) servers: List[str] = field(default_factory=list) max_turns: int = 10 @dataclass class Settings: """General settings.""" default_provider: str = "claude" default_model: str = "sonnet" timeout: float = 300.0 working_dir: str = "." max_tool_iterations: int = 10 # Security ssh_strict_host_checking: bool = True sandbox_paths: bool = True allowed_commands: List[str] = field(default_factory=list) # Rate limiting rate_limit_per_minute: int = 60 # Retry max_retries: int = 3 retry_delay: float = 1.0 # Circuit breaker enable_circuit_breaker: bool = True circuit_breaker_threshold: int = 5 circuit_breaker_timeout: float = 30.0 class Config: """Main configuration manager.""" def __init__(self, config_path: Optional[str] = None): self.config_path = self._find_config(config_path) self.base_dir = self.config_path.parent if self.config_path else Path.cwd() self._raw = self._load_yaml() if self.config_path else {} # Parse configuration self.settings = self._parse_settings() self.servers = self._parse_servers() self.agents = self._parse_agents() self.tasks = self._raw.get("tasks", {}) # Directories self.logs_dir = self.base_dir / "logs" self.outputs_dir = self.base_dir / "outputs" # Create directories self.logs_dir.mkdir(exist_ok=True) self.outputs_dir.mkdir(exist_ok=True) def _find_config(self, config_path: Optional[str]) -> Optional[Path]: """Find configuration file.""" if config_path: path = Path(config_path) if path.exists(): return path raise FileNotFoundError(f"Config not found: {config_path}") search_paths = [ Path.cwd() / "config.yaml", Path.cwd() / "config.yml", Path(__file__).parent.parent / "config.yaml", ] for path in search_paths: if path.exists(): return path return None def _load_yaml(self) -> dict: """Load YAML file.""" if not self.config_path: return {} try: import yaml with open(self.config_path) as f: return yaml.safe_load(f) or {} except ImportError: print("WARNING: PyYAML not installed. Run: pip install pyyaml") return {} def _parse_settings(self) -> Settings: """Parse settings section.""" raw = self._raw.get("settings", {}) return Settings( default_provider=raw.get("default_provider", "claude"), default_model=raw.get("default_model", "sonnet"), timeout=float(raw.get("timeout", 300)), working_dir=raw.get("working_dir", "."), max_tool_iterations=int(raw.get("max_tool_iterations", 10)), ssh_strict_host_checking=raw.get( "ssh_strict_host_checking", get_env_bool("SSH_KNOWN_HOSTS_CHECK", True) ), sandbox_paths=raw.get("sandbox_paths", True), allowed_commands=raw.get("allowed_commands", []), rate_limit_per_minute=int(raw.get("rate_limit_per_minute", 60)), max_retries=int(raw.get("max_retries", 3)), retry_delay=float(raw.get("retry_delay", 1.0)), enable_circuit_breaker=raw.get("enable_circuit_breaker", True), circuit_breaker_threshold=int(raw.get("circuit_breaker_threshold", 5)), circuit_breaker_timeout=float(raw.get("circuit_breaker_timeout", 30.0)), ) def _parse_servers(self) -> Dict[str, ServerConfig]: """Parse servers section.""" servers = {} for name, data in self._raw.get("servers", {}).items(): if data: servers[name] = ServerConfig( name=name, host=data.get("host", ""), user=data.get("user", "root"), key=data.get("key", get_env("SSH_KEY_PATH", "~/.ssh/id_rsa")), port=int(data.get("port", 22)), description=data.get("description", ""), ) return servers def _parse_agents(self) -> Dict[str, AgentConfig]: """Parse agents section.""" agents = {} for name, data in self._raw.get("agents", {}).items(): if data: agents[name] = AgentConfig( name=name, role=data.get("role", ""), provider=data.get("provider", self.settings.default_provider), model=data.get("model", self.settings.default_model), tools=data.get("tools", []), servers=data.get("servers", []), max_turns=int(data.get("max_turns", self.settings.max_tool_iterations)), ) return agents def get_agent(self, name: str) -> Optional[AgentConfig]: return self.agents.get(name) def get_server(self, name: str) -> Optional[ServerConfig]: return self.servers.get(name) def list_agents(self) -> List[str]: return list(self.agents.keys()) def list_servers(self) -> List[str]: return list(self.servers.keys()) # Global instance _config: Optional[Config] = None def get_config(config_path: Optional[str] = None) -> Config: """Get global configuration.""" global _config if _config is None: _config = Config(config_path) return _config def reload_config(config_path: Optional[str] = None) -> Config: """Reload configuration.""" global _config _config = Config(config_path) return _config ``` --- ## orchestrator/core/\_\_init\_\_.py ```python """ Core components for resilience and reliability. """ from .retry import retry_with_backoff, RetryConfig from .circuit_breaker import CircuitBreaker, CircuitState from .rate_limiter import RateLimiter __all__ = [ "retry_with_backoff", "RetryConfig", "CircuitBreaker", "CircuitState", "RateLimiter", ] ``` --- ## orchestrator/core/retry.py ```python """ Retry logic with exponential backoff. Features: - Configurable retry attempts - Exponential backoff with jitter - Selective exception handling - Async support """ import asyncio import random from dataclasses import dataclass, field from typing import Callable, TypeVar, Optional, Type, Tuple, Any from functools import wraps T = TypeVar("T") @dataclass class RetryConfig: """Configuration for retry behavior.""" max_retries: int = 3 base_delay: float = 1.0 max_delay: float = 60.0 exponential_base: float = 2.0 jitter: bool = True # Exceptions that should trigger retry retryable_exceptions: Tuple[Type[Exception], ...] = field( default_factory=lambda: ( ConnectionError, TimeoutError, asyncio.TimeoutError, ) ) # Exceptions that should NOT retry (fail immediately) fatal_exceptions: Tuple[Type[Exception], ...] = field( default_factory=lambda: ( KeyboardInterrupt, SystemExit, PermissionError, ) ) def calculate_delay(self, attempt: int) -> float: """Calculate delay for given attempt number.""" delay = min( self.base_delay * (self.exponential_base ** attempt), self.max_delay ) if self.jitter: # Add random jitter (±25%) delay = delay * (0.75 + random.random() * 0.5) return delay def should_retry(self, exception: Exception) -> bool: """Determine if exception should trigger retry.""" if isinstance(exception, self.fatal_exceptions): return False if self.retryable_exceptions: return isinstance(exception, self.retryable_exceptions) # By default, retry on any non-fatal exception return True async def retry_with_backoff( func: Callable[..., T], *args, config: Optional[RetryConfig] = None, on_retry: Optional[Callable[[int, Exception], None]] = None, **kwargs ) -> Tuple[T, int]: """ Execute async function with retry and exponential backoff. Args: func: Async function to execute *args: Arguments for func config: Retry configuration on_retry: Optional callback on each retry (attempt, exception) **kwargs: Keyword arguments for func Returns: Tuple of (result, number_of_retries) Raises: Last exception if all retries exhausted """ config = config or RetryConfig() last_exception: Optional[Exception] = None for attempt in range(config.max_retries + 1): try: if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs) return result, attempt except Exception as e: last_exception = e # Check if we should retry if not config.should_retry(e): raise # Check if we have retries left if attempt >= config.max_retries: raise # Calculate delay and wait delay = config.calculate_delay(attempt) # Call retry callback if provided if on_retry: on_retry(attempt + 1, e) await asyncio.sleep(delay) # Should never reach here, but just in case if last_exception: raise last_exception raise RuntimeError("Retry logic error") def with_retry(config: Optional[RetryConfig] = None): """ Decorator for retry with backoff. Usage: @with_retry(RetryConfig(max_retries=5)) async def my_function(): ... """ def decorator(func: Callable[..., T]) -> Callable[..., T]: @wraps(func) async def wrapper(*args, **kwargs) -> T: result, _ = await retry_with_backoff(func, *args, config=config, **kwargs) return result return wrapper return decorator ``` --- ## orchestrator/core/circuit_breaker.py ```python """ Circuit Breaker pattern implementation. Protects against cascading failures by: - Tracking failure rates - Opening circuit after threshold - Auto-recovery with half-open state """ import asyncio import time from dataclasses import dataclass from enum import Enum from typing import Callable, TypeVar, Optional, Any T = TypeVar("T") class CircuitState(Enum): """Circuit breaker states.""" CLOSED = "closed" # Normal operation OPEN = "open" # Failing, reject calls HALF_OPEN = "half_open" # Testing recovery @dataclass class CircuitStats: """Statistics for circuit breaker.""" total_calls: int = 0 successful_calls: int = 0 failed_calls: int = 0 rejected_calls: int = 0 last_failure_time: Optional[float] = None last_success_time: Optional[float] = None @property def failure_rate(self) -> float: if self.total_calls == 0: return 0.0 return self.failed_calls / self.total_calls class CircuitBreakerError(Exception): """Raised when circuit is open.""" pass class CircuitBreaker: """ Circuit breaker for fault tolerance. States: - CLOSED: Normal operation, tracking failures - OPEN: Circuit tripped, rejecting calls immediately - HALF_OPEN: Testing if service recovered Usage: breaker = CircuitBreaker(failure_threshold=5) async with breaker: result = await risky_operation() """ def __init__( self, failure_threshold: int = 5, success_threshold: int = 2, timeout: float = 30.0, half_open_max_calls: int = 3, ): """ Args: failure_threshold: Failures before opening circuit success_threshold: Successes in half-open to close timeout: Seconds before trying half-open half_open_max_calls: Max concurrent calls in half-open """ self.failure_threshold = failure_threshold self.success_threshold = success_threshold self.timeout = timeout self.half_open_max_calls = half_open_max_calls self._state = CircuitState.CLOSED self._failure_count = 0 self._success_count = 0 self._last_failure_time: Optional[float] = None self._half_open_calls = 0 self._lock = asyncio.Lock() self.stats = CircuitStats() @property def state(self) -> CircuitState: """Current circuit state.""" return self._state @property def is_closed(self) -> bool: return self._state == CircuitState.CLOSED @property def is_open(self) -> bool: return self._state == CircuitState.OPEN def _should_attempt_reset(self) -> bool: """Check if enough time passed to try half-open.""" if self._last_failure_time is None: return True return time.time() - self._last_failure_time >= self.timeout async def _check_state(self) -> bool: """ Check and potentially transition state. Returns True if call should proceed. """ async with self._lock: if self._state == CircuitState.CLOSED: return True if self._state == CircuitState.OPEN: if self._should_attempt_reset(): self._state = CircuitState.HALF_OPEN self._success_count = 0 self._half_open_calls = 0 else: self.stats.rejected_calls += 1 return False if self._state == CircuitState.HALF_OPEN: if self._half_open_calls >= self.half_open_max_calls: self.stats.rejected_calls += 1 return False self._half_open_calls += 1 return True async def _record_success(self): """Record successful call.""" async with self._lock: self.stats.total_calls += 1 self.stats.successful_calls += 1 self.stats.last_success_time = time.time() if self._state == CircuitState.HALF_OPEN: self._success_count += 1 if self._success_count >= self.success_threshold: self._state = CircuitState.CLOSED self._failure_count = 0 elif self._state == CircuitState.CLOSED: # Reset failure count on success self._failure_count = 0 async def _record_failure(self, exception: Exception): """Record failed call.""" async with self._lock: self.stats.total_calls += 1 self.stats.failed_calls += 1 self.stats.last_failure_time = time.time() self._last_failure_time = time.time() if self._state == CircuitState.HALF_OPEN: # Any failure in half-open reopens circuit self._state = CircuitState.OPEN elif self._state == CircuitState.CLOSED: self._failure_count += 1 if self._failure_count >= self.failure_threshold: self._state = CircuitState.OPEN async def call( self, func: Callable[..., T], *args, **kwargs ) -> T: """ Execute function through circuit breaker. Raises: CircuitBreakerError: If circuit is open """ if not await self._check_state(): raise CircuitBreakerError( f"Circuit breaker is {self._state.value}. " f"Retry after {self.timeout}s" ) try: if asyncio.iscoroutinefunction(func): result = await func(*args, **kwargs) else: result = func(*args, **kwargs) await self._record_success() return result except Exception as e: await self._record_failure(e) raise async def __aenter__(self): """Async context manager entry.""" if not await self._check_state(): raise CircuitBreakerError( f"Circuit breaker is {self._state.value}" ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if exc_type is None: await self._record_success() else: await self._record_failure(exc_val) return False # Don't suppress exceptions def reset(self): """Manually reset circuit breaker.""" self._state = CircuitState.CLOSED self._failure_count = 0 self._success_count = 0 self._last_failure_time = None self.stats = CircuitStats() def __repr__(self) -> str: return ( f"CircuitBreaker(state={self._state.value}, " f"failures={self._failure_count}/{self.failure_threshold})" ) ``` --- ## orchestrator/core/rate_limiter.py ```python """ Rate limiter with sliding window. Features: - Sliding window algorithm - Async-safe with lock - Configurable burst allowance """ import asyncio import time from collections import deque from dataclasses import dataclass from typing import Optional @dataclass class RateLimitStats: """Statistics for rate limiter.""" total_requests: int = 0 allowed_requests: int = 0 throttled_requests: int = 0 total_wait_time: float = 0.0 class RateLimiter: """ Sliding window rate limiter. Limits calls to max_calls per period seconds. Usage: limiter = RateLimiter(max_calls=60, period=60.0) await limiter.acquire() # Blocks if rate exceeded do_something() """ def __init__( self, max_calls: int = 60, period: float = 60.0, burst_allowance: int = 0, ): """ Args: max_calls: Maximum calls allowed in period period: Time window in seconds burst_allowance: Extra calls allowed for bursts """ self.max_calls = max_calls self.period = period self.burst_allowance = burst_allowance self._calls: deque = deque() self._lock = asyncio.Lock() self.stats = RateLimitStats() @property def effective_limit(self) -> int: """Effective rate limit including burst.""" return self.max_calls + self.burst_allowance def _cleanup_old_calls(self, now: float): """Remove calls outside the window.""" cutoff = now - self.period while self._calls and self._calls[0] < cutoff: self._calls.popleft() @property def current_usage(self) -> int: """Current number of calls in window.""" self._cleanup_old_calls(time.time()) return len(self._calls) @property def available_calls(self) -> int: """Number of calls available right now.""" return max(0, self.effective_limit - self.current_usage) async def acquire(self, timeout: Optional[float] = None) -> bool: """ Acquire permission to make a call. Blocks until rate limit allows the call. Args: timeout: Maximum time to wait (None = unlimited) Returns: True if acquired, False if timeout """ start_time = time.time() async with self._lock: self.stats.total_requests += 1 while True: now = time.time() self._cleanup_old_calls(now) # Check if we can proceed if len(self._calls) < self.effective_limit: self._calls.append(now) self.stats.allowed_requests += 1 return True # Calculate wait time oldest_call = self._calls[0] wait_time = oldest_call + self.period - now if wait_time <= 0: continue # Check timeout if timeout is not None: elapsed = now - start_time if elapsed + wait_time > timeout: self.stats.throttled_requests += 1 return False # Wait and retry self.stats.total_wait_time += wait_time # Release lock while waiting self._lock.release() try: await asyncio.sleep(wait_time) finally: await self._lock.acquire() async def try_acquire(self) -> bool: """ Try to acquire without waiting. Returns: True if acquired, False if would need to wait """ async with self._lock: self.stats.total_requests += 1 now = time.time() self._cleanup_old_calls(now) if len(self._calls) < self.effective_limit: self._calls.append(now) self.stats.allowed_requests += 1 return True self.stats.throttled_requests += 1 return False def reset(self): """Reset rate limiter state.""" self._calls.clear() self.stats = RateLimitStats() def __repr__(self) -> str: return ( f"RateLimiter({self.current_usage}/{self.effective_limit} " f"per {self.period}s)" ) class MultiRateLimiter: """ Multiple rate limiters combined. Useful for APIs with multiple rate limits: - 60 requests per minute - 1000 requests per hour Usage: limiter = MultiRateLimiter([ RateLimiter(60, 60), # per minute RateLimiter(1000, 3600), # per hour ]) await limiter.acquire() """ def __init__(self, limiters: list[RateLimiter]): self.limiters = limiters async def acquire(self, timeout: Optional[float] = None) -> bool: """Acquire from all limiters.""" for limiter in self.limiters: if not await limiter.acquire(timeout): return False return True async def try_acquire(self) -> bool: """Try to acquire from all limiters without waiting.""" for limiter in self.limiters: if not await limiter.try_acquire(): return False return True ``` --- ## orchestrator/providers/\_\_init\_\_.py ```python """ LLM Providers. """ from .base import BaseProvider, ProviderResponse from .claude_provider import ClaudeProvider from .litellm_provider import LiteLLMProvider __all__ = [ "BaseProvider", "ProviderResponse", "ClaudeProvider", "LiteLLMProvider", ] ``` --- ## orchestrator/providers/base.py ```python """ Base provider with resilience features. All providers inherit from this and get: - Rate limiting - Circuit breaker - Retry with backoff - Consistent response format """ from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime from typing import Optional, Any, List from ..core import ( CircuitBreaker, RateLimiter, RetryConfig, retry_with_backoff, ) @dataclass class ToolCall: """A tool call from the LLM.""" tool: str params: dict id: Optional[str] = None @dataclass class ToolResult: """Result of executing a tool.""" tool: str success: bool output: str error: Optional[str] = None execution_time: float = 0.0 @dataclass class UsageInfo: """Token usage information.""" input_tokens: int = 0 output_tokens: int = 0 cache_read_tokens: int = 0 cache_creation_tokens: int = 0 @property def total_tokens(self) -> int: return self.input_tokens + self.output_tokens @dataclass class ProviderResponse: """Standardized response from any provider.""" success: bool text: str provider: str model: str timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) error: Optional[str] = None usage: Optional[UsageInfo] = None tool_calls: List[ToolCall] = field(default_factory=list) tool_results: List[ToolResult] = field(default_factory=list) retries: int = 0 raw_response: Optional[Any] = None session_id: Optional[str] = None @property def ok(self) -> bool: return self.success def to_dict(self) -> dict: """Convert to dictionary.""" return { "success": self.success, "text": self.text, "provider": self.provider, "model": self.model, "timestamp": self.timestamp, "error": self.error, "usage": { "input_tokens": self.usage.input_tokens, "output_tokens": self.usage.output_tokens, "total_tokens": self.usage.total_tokens, } if self.usage else None, "retries": self.retries, "session_id": self.session_id, } class BaseProvider(ABC): """ Abstract base class for all LLM providers. Provides: - Rate limiting - Circuit breaker for fault tolerance - Retry with exponential backoff - Consistent response format Subclasses must implement: - name (property) - available_models (property) - _execute (method) """ # Model aliases - override in subclasses MODEL_ALIASES: dict[str, str] = {} def __init__( self, model: str, timeout: float = 300.0, max_retries: int = 3, retry_delay: float = 1.0, rate_limit_per_minute: int = 60, enable_circuit_breaker: bool = True, circuit_breaker_threshold: int = 5, circuit_breaker_timeout: float = 30.0, **kwargs ): self.model = model self.timeout = timeout self.config = kwargs # Retry config self.retry_config = RetryConfig( max_retries=max_retries, base_delay=retry_delay, max_delay=60.0, ) # Rate limiter self.rate_limiter = RateLimiter( max_calls=rate_limit_per_minute, period=60.0, ) # Circuit breaker self.enable_circuit_breaker = enable_circuit_breaker self.circuit_breaker = CircuitBreaker( failure_threshold=circuit_breaker_threshold, timeout=circuit_breaker_timeout, ) if enable_circuit_breaker else None @property @abstractmethod def name(self) -> str: """Provider name (e.g., 'claude', 'litellm').""" pass @property @abstractmethod def available_models(self) -> list[str]: """List of available model names/aliases.""" pass @property def supports_native_tools(self) -> bool: """Whether provider supports native function calling.""" return False @property def supports_streaming(self) -> bool: """Whether provider supports streaming responses.""" return False def resolve_model(self, model: str) -> str: """Resolve model alias to full model name.""" return self.MODEL_ALIASES.get(model, model) @abstractmethod async def _execute( self, prompt: str, system_prompt: Optional[str] = None, tools: Optional[list[str]] = None, **kwargs ) -> ProviderResponse: """ Internal execution method - implement in subclasses. This is the raw execution without retry/circuit breaker. """ pass async def run( self, prompt: str, system_prompt: Optional[str] = None, **kwargs ) -> ProviderResponse: """ Execute a prompt without tools. Includes rate limiting, retry, and circuit breaker. """ return await self.run_with_tools( prompt=prompt, tools=[], system_prompt=system_prompt, **kwargs ) async def run_with_tools( self, prompt: str, tools: list[str], system_prompt: Optional[str] = None, **kwargs ) -> ProviderResponse: """ Execute a prompt with tools. Includes rate limiting, retry, and circuit breaker. """ # Rate limiting await self.rate_limiter.acquire() # Define the execution function async def do_execute() -> ProviderResponse: return await self._execute( prompt=prompt, system_prompt=system_prompt, tools=tools, **kwargs ) try: # With circuit breaker if self.circuit_breaker: async with self.circuit_breaker: response, retries = await retry_with_backoff( do_execute, config=self.retry_config, ) else: response, retries = await retry_with_backoff( do_execute, config=self.retry_config, ) response.retries = retries return response except Exception as e: return ProviderResponse( success=False, text="", provider=self.name, model=self.model, error=str(e), ) async def health_check(self) -> bool: """Check if provider is healthy.""" try: response = await self.run("Respond with only: OK") return response.success and "OK" in response.text.upper() except Exception: return False def __repr__(self) -> str: return f"{self.__class__.__name__}(model={self.model})" ``` --- ## orchestrator/providers/claude_provider.py ```python """ Claude provider using Claude Code CLI. This provider executes Claude via the CLI, handling: - Proper JSON parsing - Session management - Tool execution - Error handling """ import asyncio import json import shutil import os from typing import Optional, List, Any from pathlib import Path from .base import BaseProvider, ProviderResponse, UsageInfo, ToolCall class ClaudeProviderError(Exception): """Base exception for Claude provider.""" pass class CLINotFoundError(ClaudeProviderError): """Claude CLI not found.""" pass class ExecutionError(ClaudeProviderError): """Error during execution.""" pass class ClaudeProvider(BaseProvider): """ Provider that uses Claude Code CLI. Requires: - Claude Code CLI installed (`claude` command available) - Valid authentication (Pro/Max subscription or API key) Features: - Native tool support (Bash, Read, Write, etc.) - Session management - JSON output parsing - Autocompact for long conversations """ MODEL_ALIASES = { # Friendly names "haiku": "claude-3-5-haiku-latest", "sonnet": "claude-sonnet-4-20250514", "opus": "claude-opus-4-20250514", # Semantic names "fast": "claude-3-5-haiku-latest", "default": "claude-sonnet-4-20250514", "smart": "claude-sonnet-4-20250514", "powerful": "claude-opus-4-20250514", } # Tools supported by Claude CLI NATIVE_TOOLS = [ "Bash", "Read", "Write", "Edit", "Glob", "Grep", "WebFetch", "TodoRead", "TodoWrite", ] # Mapping from our tool names to Claude CLI tool names TOOL_NAME_MAP = { "bash": "Bash", "read": "Read", "write": "Write", "edit": "Edit", "glob": "Glob", "grep": "Grep", "web_fetch": "WebFetch", "webfetch": "WebFetch", } def __init__( self, model: str = "sonnet", timeout: float = 300.0, cli_path: str = "claude", working_directory: Optional[str] = None, max_turns: int = 10, autocompact: bool = True, **kwargs ): """ Args: model: Model to use (sonnet, opus, haiku, or full name) timeout: Execution timeout in seconds cli_path: Path to claude CLI working_directory: Working directory for execution max_turns: Maximum tool use turns autocompact: Enable automatic context compaction """ super().__init__(model=model, timeout=timeout, **kwargs) self.cli_path = cli_path self.working_directory = working_directory or os.getcwd() self.max_turns = max_turns self.autocompact = autocompact # Verify CLI exists if not shutil.which(self.cli_path): raise CLINotFoundError( f"Claude CLI not found at '{self.cli_path}'. " "Install with: npm install -g @anthropic-ai/claude-code" ) @property def name(self) -> str: return "claude" @property def available_models(self) -> list[str]: return list(self.MODEL_ALIASES.keys()) @property def supports_native_tools(self) -> bool: return True def _map_tools(self, tools: List[str]) -> List[str]: """Map our tool names to Claude CLI tool names.""" mapped = [] for tool in tools: tool_lower = tool.lower() if tool_lower in self.TOOL_NAME_MAP: mapped.append(self.TOOL_NAME_MAP[tool_lower]) elif tool in self.NATIVE_TOOLS: mapped.append(tool) # Skip non-native tools (http_request, ssh, etc.) return mapped def _build_command( self, prompt: str, system_prompt: Optional[str] = None, tools: Optional[List[str]] = None, session_id: Optional[str] = None, continue_session: bool = False, ) -> List[str]: """Build the CLI command.""" cmd = [self.cli_path] # Prompt cmd.extend(["-p", prompt]) # Output format cmd.extend(["--output-format", "json"]) # Model resolved_model = self.resolve_model(self.model) cmd.extend(["--model", resolved_model]) # System prompt if system_prompt: cmd.extend(["--system-prompt", system_prompt]) # Tools if tools: native_tools = self._map_tools(tools) if native_tools: cmd.extend(["--allowedTools", ",".join(native_tools)]) # Max turns cmd.extend(["--max-turns", str(self.max_turns)]) # Session management if session_id: if continue_session: cmd.extend(["--continue", session_id]) else: cmd.extend(["--session-id", session_id]) return cmd def _parse_response(self, stdout: str, stderr: str, return_code: int) -> dict: """Parse the CLI response.""" # Try to parse JSON try: # Handle potential multiple JSON objects (streaming artifacts) lines = stdout.strip().split('\n') # Find the last valid JSON object for line in reversed(lines): line = line.strip() if line.startswith('{'): try: return json.loads(line) except json.JSONDecodeError: continue # Try parsing the whole output return json.loads(stdout) except json.JSONDecodeError: # If JSON parsing fails, return raw output return { "result": stdout, "is_error": return_code != 0, "error": stderr if stderr else None, } def _extract_usage(self, response: dict) -> Optional[UsageInfo]: """Extract usage information from response.""" usage_data = response.get("usage") if not usage_data: return None return UsageInfo( input_tokens=usage_data.get("input_tokens", 0), output_tokens=usage_data.get("output_tokens", 0), cache_read_tokens=usage_data.get("cache_read_input_tokens", 0), cache_creation_tokens=usage_data.get("cache_creation_input_tokens", 0), ) async def _execute( self, prompt: str, system_prompt: Optional[str] = None, tools: Optional[List[str]] = None, session_id: Optional[str] = None, continue_session: bool = False, **kwargs ) -> ProviderResponse: """Execute the Claude CLI.""" cmd = self._build_command( prompt=prompt, system_prompt=system_prompt, tools=tools or [], session_id=session_id, continue_session=continue_session, ) try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=self.working_directory, ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=self.timeout ) stdout_str = stdout.decode("utf-8", errors="replace") stderr_str = stderr.decode("utf-8", errors="replace") # Parse response response_data = self._parse_response( stdout_str, stderr_str, process.returncode ) # Check for errors is_error = response_data.get("is_error", False) or process.returncode != 0 # Extract text text = response_data.get("result", "") if not text and "content" in response_data: # Handle different response formats content = response_data["content"] if isinstance(content, list): text = "\n".join( c.get("text", "") for c in content if c.get("type") == "text" ) elif isinstance(content, str): text = content # Extract error error = None if is_error: error = response_data.get("error") or stderr_str or "Unknown error" return ProviderResponse( success=not is_error, text=text, provider=self.name, model=self.model, error=error, usage=self._extract_usage(response_data), session_id=response_data.get("session_id"), raw_response=response_data, ) except asyncio.TimeoutError: raise ExecutionError(f"Timeout after {self.timeout}s") except FileNotFoundError: raise CLINotFoundError(f"Claude CLI not found: {self.cli_path}") except Exception as e: raise ExecutionError(f"Execution failed: {e}") async def run_with_session( self, prompt: str, session_id: str, continue_session: bool = True, **kwargs ) -> ProviderResponse: """Run with explicit session management.""" return await self.run_with_tools( prompt=prompt, tools=kwargs.pop("tools", []), system_prompt=kwargs.pop("system_prompt", None), session_id=session_id, continue_session=continue_session, **kwargs ) async def health_check(self) -> bool: """Verify Claude CLI is working.""" try: # Simple test command cmd = [self.cli_path, "-p", "Say OK", "--output-format", "json", "--max-turns", "1"] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, _ = await asyncio.wait_for( process.communicate(), timeout=30.0 ) if process.returncode == 0: return True return False except Exception: return False ``` --- ## orchestrator/providers/litellm_provider.py ```python """ Universal LLM provider using LiteLLM. Supports 100+ models through a unified interface: - OpenAI (GPT-4, o1) - Google (Gemini) - Anthropic (via API) - Mistral - Ollama (local) - Groq - Together - And many more... """ import asyncio import json import re from typing import Optional, List, Any from .base import BaseProvider, ProviderResponse, UsageInfo, ToolCall, ToolResult class LiteLLMProvider(BaseProvider): """ Universal provider using LiteLLM. Requires: - pip install litellm - Appropriate API keys in environment Features: - 100+ model support - Unified API - Tool execution loop - Cost tracking """ MODEL_ALIASES = { # OpenAI "gpt4": "gpt-4", "gpt4o": "gpt-4o", "gpt4-turbo": "gpt-4-turbo", "gpt4o-mini": "gpt-4o-mini", "o1": "o1-preview", "o1-mini": "o1-mini", "o3-mini": "o3-mini", # Google Gemini "gemini": "gemini/gemini-1.5-pro", "gemini-pro": "gemini/gemini-1.5-pro", "gemini-flash": "gemini/gemini-1.5-flash", "gemini-2": "gemini/gemini-2.0-flash", # Anthropic (via API, not CLI) "claude-api": "claude-3-5-sonnet-20241022", "claude-opus-api": "claude-3-opus-20240229", # Mistral "mistral": "mistral/mistral-large-latest", "mistral-small": "mistral/mistral-small-latest", "mixtral": "mistral/open-mixtral-8x22b", "codestral": "mistral/codestral-latest", # Ollama (local) "llama3": "ollama/llama3", "llama3.1": "ollama/llama3.1", "llama3.2": "ollama/llama3.2", "codellama": "ollama/codellama", "mixtral-local": "ollama/mixtral", "qwen": "ollama/qwen2.5", "deepseek": "ollama/deepseek-r1", # Groq (fast inference) "groq-llama": "groq/llama-3.3-70b-versatile", "groq-mixtral": "groq/mixtral-8x7b-32768", # Together "together-llama": "together_ai/meta-llama/Llama-3-70b-chat-hf", # DeepSeek "deepseek-chat": "deepseek/deepseek-chat", "deepseek-coder": "deepseek/deepseek-coder", } # Models that support native function calling FUNCTION_CALLING_MODELS = { "gpt-4", "gpt-4o", "gpt-4-turbo", "gpt-4o-mini", "claude-3", "gemini", } def __init__( self, model: str = "gpt-4o", timeout: float = 300.0, api_key: Optional[str] = None, api_base: Optional[str] = None, max_tool_iterations: int = 10, temperature: float = 0.7, **kwargs ): """ Args: model: Model name or alias timeout: Request timeout api_key: Override API key api_base: Override API base URL max_tool_iterations: Max tool use loops temperature: Sampling temperature """ super().__init__(model=model, timeout=timeout, **kwargs) self.api_key = api_key self.api_base = api_base self.max_tool_iterations = max_tool_iterations self.temperature = temperature self._litellm = None def _get_litellm(self): """Lazy load litellm.""" if self._litellm is None: try: import litellm litellm.set_verbose = False self._litellm = litellm except ImportError: raise ImportError( "LiteLLM not installed. Run: pip install litellm" ) return self._litellm @property def name(self) -> str: return "litellm" @property def available_models(self) -> list[str]: return list(self.MODEL_ALIASES.keys()) @property def supports_native_tools(self) -> bool: """Check if current model supports function calling.""" resolved = self.resolve_model(self.model) return any(fc in resolved for fc in self.FUNCTION_CALLING_MODELS) def _build_tools_prompt(self, tools: List[str]) -> str: """Build a prompt section describing available tools.""" if not tools: return "" prompt = """ ## Available Tools You can use tools by responding with a JSON block like this: ```tool { "tool": "tool_name", "params": { "param1": "value1" } } ``` Available tools: """ tool_descriptions = { "bash": "Execute a bash command. Params: command (required), working_dir (optional)", "read": "Read a file. Params: path (required), encoding (optional)", "write": "Write to a file. Params: path (required), content (required), append (optional bool)", "glob": "Find files by pattern. Params: pattern (required), base_dir (optional)", "grep": "Search text in files. Params: pattern (required), path (required), recursive (optional bool)", "http_request": "Make HTTP request. Params: url (required), method (optional), headers (optional), body (optional)", "ssh": "Execute remote command. Params: host (required), command (required), user (optional), key_path (optional)", "list_dir": "List directory contents. Params: path (required), recursive (optional bool)", } for tool in tools: if tool in tool_descriptions: prompt += f"- **{tool}**: {tool_descriptions[tool]}\n" prompt += "\nAfter using a tool, you'll receive the result and can continue.\n" return prompt def _parse_tool_calls(self, text: str) -> List[ToolCall]: """Parse tool calls from response text.""" calls = [] # Pattern: ```tool { ... } ``` pattern1 = r'```tool\s*\n?(.*?)\n?```' for match in re.findall(pattern1, text, re.DOTALL): try: data = json.loads(match.strip()) if "tool" in data: calls.append(ToolCall( tool=data["tool"], params=data.get("params", {}), )) except json.JSONDecodeError: continue # Pattern: ```json { "tool": ... } ``` pattern2 = r'```json\s*\n?(.*?)\n?```' for match in re.findall(pattern2, text, re.DOTALL): try: data = json.loads(match.strip()) if "tool" in data: calls.append(ToolCall( tool=data["tool"], params=data.get("params", {}), )) except json.JSONDecodeError: continue return calls async def _call_llm( self, messages: List[dict], max_tokens: Optional[int] = None, ) -> dict: """Make a single LLM call.""" litellm = self._get_litellm() resolved_model = self.resolve_model(self.model) kwargs = { "model": resolved_model, "messages": messages, "temperature": self.temperature, "timeout": self.timeout, } if max_tokens: kwargs["max_tokens"] = max_tokens if self.api_key: kwargs["api_key"] = self.api_key if self.api_base: kwargs["api_base"] = self.api_base return await litellm.acompletion(**kwargs) async def _execute( self, prompt: str, system_prompt: Optional[str] = None, tools: Optional[List[str]] = None, tool_executor: Optional[Any] = None, max_tokens: Optional[int] = None, **kwargs ) -> ProviderResponse: """Execute with optional tool loop.""" tools = tools or [] resolved_model = self.resolve_model(self.model) # Build system prompt with tools full_system = system_prompt or "" if tools: tools_prompt = self._build_tools_prompt(tools) full_system = f"{full_system}\n\n{tools_prompt}" if full_system else tools_prompt # Build messages messages = [] if full_system: messages.append({"role": "system", "content": full_system}) messages.append({"role": "user", "content": prompt}) all_tool_results: List[ToolResult] = [] total_usage = UsageInfo() text = "" # Tool execution loop for iteration in range(self.max_tool_iterations): try: response = await self._call_llm(messages, max_tokens) except Exception as e: return ProviderResponse( success=False, text="", provider=self.name, model=resolved_model, error=f"LiteLLM error: {e}", ) # Extract text text = response.choices[0].message.content or "" # Update usage if hasattr(response, 'usage') and response.usage: total_usage.input_tokens += getattr(response.usage, 'prompt_tokens', 0) total_usage.output_tokens += getattr(response.usage, 'completion_tokens', 0) # Check for tool calls if tools and tool_executor: tool_calls = self._parse_tool_calls(text) if tool_calls: # Execute tools results_text_parts = [] for call in tool_calls: result = await tool_executor.execute(call.tool, call.params) all_tool_results.append(ToolResult( tool=call.tool, success=result.success, output=result.output, error=result.error, execution_time=result.execution_time, )) status = "✅" if result.success else "❌" output = result.output if result.success else result.error results_text_parts.append( f"**{call.tool}** {status}\n{output}" ) # Add to conversation and continue results_text = "\n\n".join(results_text_parts) messages.append({"role": "assistant", "content": text}) messages.append({"role": "user", "content": f"Tool results:\n\n{results_text}\n\nContinue."}) continue # No tool calls, we're done break return ProviderResponse( success=True, text=text, provider=self.name, model=resolved_model, usage=total_usage if total_usage.total_tokens > 0 else None, tool_results=[ ToolResult( tool=r.tool, success=r.success, output=r.output, error=r.error, ) for r in all_tool_results ], ) ``` --- ## orchestrator/tools/\_\_init\_\_.py ```python """ Tool definitions and executor. """ from .definitions import TOOL_DEFINITIONS, get_tool_schema, get_tools_prompt from .executor import ToolExecutor, ToolResult, SecurityValidator __all__ = [ "TOOL_DEFINITIONS", "get_tool_schema", "get_tools_prompt", "ToolExecutor", "ToolResult", "SecurityValidator", ] ``` --- ## orchestrator/tools/definitions.py ```python """ Tool definitions in standard format. Compatible with OpenAI function calling and other providers. """ from typing import Any, List, Dict TOOL_DEFINITIONS: Dict[str, dict] = { "bash": { "name": "bash", "description": "Execute a bash command on the system. Use for running scripts, system commands, git, etc.", "parameters": { "type": "object", "properties": { "command": { "type": "string", "description": "The bash command to execute" }, "working_dir": { "type": "string", "description": "Optional working directory" } }, "required": ["command"] } }, "read": { "name": "read", "description": "Read the contents of a file. Returns the text content.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file to read" }, "encoding": { "type": "string", "description": "File encoding (default: utf-8)" } }, "required": ["path"] } }, "write": { "name": "write", "description": "Write content to a file. Creates the file if it doesn't exist, overwrites if it does.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file to write" }, "content": { "type": "string", "description": "Content to write to the file" }, "append": { "type": "boolean", "description": "If true, append instead of overwrite" } }, "required": ["path", "content"] } }, "glob": { "name": "glob", "description": "Find files using glob patterns. E.g., '**/*.py' finds all Python files.", "parameters": { "type": "object", "properties": { "pattern": { "type": "string", "description": "Glob pattern to search for files" }, "base_dir": { "type": "string", "description": "Base directory for the search" } }, "required": ["pattern"] } }, "grep": { "name": "grep", "description": "Search for text in files using regular expressions.", "parameters": { "type": "object", "properties": { "pattern": { "type": "string", "description": "Regex pattern to search for" }, "path": { "type": "string", "description": "File or directory to search in" }, "recursive": { "type": "boolean", "description": "Search recursively in subdirectories" } }, "required": ["pattern", "path"] } }, "http_request": { "name": "http_request", "description": "Make an HTTP request. Useful for REST APIs.", "parameters": { "type": "object", "properties": { "url": { "type": "string", "description": "URL to request" }, "method": { "type": "string", "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"], "description": "HTTP method" }, "headers": { "type": "object", "description": "Request headers" }, "body": { "type": "string", "description": "Request body (for POST, PUT, PATCH)" }, "timeout": { "type": "number", "description": "Request timeout in seconds" } }, "required": ["url"] } }, "ssh": { "name": "ssh", "description": "Execute a command on a remote server via SSH.", "parameters": { "type": "object", "properties": { "host": { "type": "string", "description": "Server host or IP" }, "command": { "type": "string", "description": "Command to execute on the server" }, "user": { "type": "string", "description": "SSH user (default: root)" }, "key_path": { "type": "string", "description": "Path to SSH key" }, "port": { "type": "integer", "description": "SSH port (default: 22)" } }, "required": ["host", "command"] } }, "list_dir": { "name": "list_dir", "description": "List the contents of a directory.", "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the directory" }, "recursive": { "type": "boolean", "description": "List recursively" }, "include_hidden": { "type": "boolean", "description": "Include hidden files" } }, "required": ["path"] } }, } def get_tool_schema(tool_names: List[str], format: str = "openai") -> List[dict]: """ Get tool schemas in the specified format. Args: tool_names: List of tool names format: Output format (openai, anthropic, gemini) Returns: List of tool definitions """ tools = [] for name in tool_names: if name not in TOOL_DEFINITIONS: continue tool_def = TOOL_DEFINITIONS[name] if format == "openai": tools.append({ "type": "function", "function": { "name": tool_def["name"], "description": tool_def["description"], "parameters": tool_def["parameters"] } }) elif format == "anthropic": tools.append({ "name": tool_def["name"], "description": tool_def["description"], "input_schema": tool_def["parameters"] }) elif format == "gemini": tools.append({ "function_declarations": [{ "name": tool_def["name"], "description": tool_def["description"], "parameters": tool_def["parameters"] }] }) else: tools.append(tool_def) return tools def get_tools_prompt(tool_names: List[str]) -> str: """ Generate a prompt describing available tools. For models that don't support native function calling. """ if not tool_names: return "" prompt = """## Available Tools You can use the following tools. To use them, respond with a JSON block like this: ```tool { "tool": "tool_name", "params": { "param1": "value1" } } ``` Tools: """ for name in tool_names: if name not in TOOL_DEFINITIONS: continue tool = TOOL_DEFINITIONS[name] props = tool["parameters"]["properties"] required = tool["parameters"].get("required", []) prompt += f"### {tool['name']}\n" prompt += f"{tool['description']}\n" prompt += "Parameters:\n" for prop_name, prop_def in props.items(): req_marker = "(required)" if prop_name in required else "(optional)" prompt += f" - {prop_name} {req_marker}: {prop_def.get('description', '')}\n" prompt += "\n" prompt += """ After using a tool, you'll receive the result and can continue. You can use multiple tools in sequence for complex tasks. """ return prompt ``` --- ## orchestrator/tools/executor.py ```python """ Secure tool executor. Features: - Path sandboxing - Command validation - Rate limiting - Retry with backoff - Security logging """ import asyncio import json import re import shlex import time import urllib.request import urllib.error from pathlib import Path from dataclasses import dataclass, field from typing import Optional, Callable, Dict, Any, List from datetime import datetime from ..core import RateLimiter, retry_with_backoff, RetryConfig @dataclass class ToolResult: """Result of tool execution.""" tool: str success: bool output: str error: Optional[str] = None execution_time: float = 0.0 timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) retries: int = 0 class SecurityValidator: """Security validator for tool operations.""" # Commands that should never be executed DANGEROUS_COMMANDS = { "rm -rf /", "rm -rf /*", ":(){ :|:& };:", "dd if=", "mkfs", "fdisk", "> /dev/sd", "chmod -R 777 /", "chown -R", "curl | sh", "wget | sh", "curl | bash", "wget | bash", } # Dangerous patterns DANGEROUS_PATTERNS = [ r"rm\s+-rf\s+/(?!\w)", # rm -rf / (but not /home) r">\s*/dev/sd[a-z]", # Write to devices r"mkfs\.", # Format disks r"dd\s+if=.+of=/dev", # dd to devices r"\|\s*(?:ba)?sh", # Piping to shell r";\s*rm\s+-rf", # Command injection with rm ] def __init__( self, working_dir: Path, sandbox: bool = True, allowed_commands: Optional[List[str]] = None ): self.working_dir = working_dir.resolve() self.sandbox = sandbox self.allowed_commands = allowed_commands def validate_path(self, path: str) -> tuple[bool, str]: """ Validate that a path is within the sandbox. Returns: (is_valid, resolved_path_or_error) """ if not self.sandbox: return True, str(Path(path).expanduser()) try: resolved = Path(path).expanduser() if not resolved.is_absolute(): resolved = self.working_dir / resolved resolved = resolved.resolve() # Check if within working_dir try: resolved.relative_to(self.working_dir) return True, str(resolved) except ValueError: return False, f"Path outside sandbox: {path}" except Exception as e: return False, f"Invalid path: {e}" def validate_command(self, command: str) -> tuple[bool, str]: """ Validate that a command is safe. Returns: (is_safe, error_if_unsafe) """ # Check exact dangerous commands for dangerous in self.DANGEROUS_COMMANDS: if dangerous in command: return False, f"Dangerous command: {dangerous}" # Check dangerous patterns for pattern in self.DANGEROUS_PATTERNS: if re.search(pattern, command, re.IGNORECASE): return False, f"Dangerous pattern: {pattern}" # Check whitelist if provided if self.allowed_commands: cmd_name = command.split()[0] if command.split() else "" if cmd_name not in self.allowed_commands: return False, f"Command not allowed: {cmd_name}" return True, "" def sanitize_for_shell(self, value: str) -> str: """Escape a value for safe shell use.""" return shlex.quote(value) class ToolExecutor: """ Executes tools securely. Features: - Path sandboxing - Command validation - Rate limiting - Automatic retry """ def __init__( self, working_dir: Optional[str] = None, timeout: float = 60.0, sandbox_paths: bool = True, allowed_commands: Optional[List[str]] = None, ssh_key_path: str = "~/.ssh/id_rsa", ssh_strict_host_checking: bool = True, rate_limit_per_minute: int = 60, max_retries: int = 3, retry_delay: float = 1.0, ): self.working_dir = Path(working_dir).resolve() if working_dir else Path.cwd() self.timeout = timeout self.ssh_key_path = Path(ssh_key_path).expanduser() self.ssh_strict_host_checking = ssh_strict_host_checking self.max_retries = max_retries # Security self.validator = SecurityValidator( self.working_dir, sandbox=sandbox_paths, allowed_commands=allowed_commands, ) # Rate limiting self.rate_limiter = RateLimiter(rate_limit_per_minute) # Retry config self.retry_config = RetryConfig( max_retries=max_retries, base_delay=retry_delay, ) # Tool registry self._tools: Dict[str, Callable] = { "bash": self._exec_bash, "read": self._exec_read, "write": self._exec_write, "glob": self._exec_glob, "grep": self._exec_grep, "http_request": self._exec_http, "ssh": self._exec_ssh, "list_dir": self._exec_list_dir, } @property def available_tools(self) -> List[str]: """List of available tool names.""" return list(self._tools.keys()) async def execute(self, tool: str, params: dict) -> ToolResult: """ Execute a tool with rate limiting and retry. """ start_time = time.time() if tool not in self._tools: return ToolResult( tool=tool, success=False, output="", error=f"Unknown tool: {tool}", ) # Rate limiting await self.rate_limiter.acquire() # Execute with retry async def do_execute(): return await self._tools[tool](params) try: result, retries = await retry_with_backoff( do_execute, config=self.retry_config, ) result.retries = retries result.execution_time = time.time() - start_time return result except Exception as e: return ToolResult( tool=tool, success=False, output="", error=str(e), execution_time=time.time() - start_time, ) def parse_tool_calls(self, text: str) -> List[dict]: """ Parse tool calls from text. Supports formats: - ```tool { ... } ``` - ```json { "tool": ... } ``` """ calls = [] # Format: ```tool ... ``` pattern1 = r'```tool\s*\n?(.*?)\n?```' for match in re.findall(pattern1, text, re.DOTALL): try: data = json.loads(match.strip()) if "tool" in data: calls.append(data) except json.JSONDecodeError: continue # Format: ```json ... ``` with tool pattern2 = r'```json\s*\n?(.*?)\n?```' for match in re.findall(pattern2, text, re.DOTALL): try: data = json.loads(match.strip()) if "tool" in data: calls.append(data) except json.JSONDecodeError: continue return calls async def execute_from_text(self, text: str) -> List[ToolResult]: """Execute all tools found in text.""" calls = self.parse_tool_calls(text) results = [] for call in calls: tool = call.get("tool") params = call.get("params", {}) result = await self.execute(tool, params) results.append(result) return results # ==================== TOOL IMPLEMENTATIONS ==================== async def _exec_bash(self, params: dict) -> ToolResult: """Execute a bash command securely.""" command = params.get("command", "") working_dir = params.get("working_dir") if not command: return ToolResult(tool="bash", success=False, output="", error="Empty command") # Validate command is_safe, error = self.validator.validate_command(command) if not is_safe: return ToolResult(tool="bash", success=False, output="", error=error) # Determine working directory cwd = self.working_dir if working_dir: is_valid, result = self.validator.validate_path(working_dir) if is_valid: cwd = Path(result) try: process = await asyncio.create_subprocess_exec( "/bin/bash", "-c", command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(cwd), ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=self.timeout, ) return ToolResult( tool="bash", success=process.returncode == 0, output=stdout.decode("utf-8", errors="replace"), error=stderr.decode("utf-8", errors="replace") if process.returncode != 0 else None, ) except asyncio.TimeoutError: return ToolResult( tool="bash", success=False, output="", error=f"Timeout after {self.timeout}s", ) async def _exec_read(self, params: dict) -> ToolResult: """Read a file within the sandbox.""" path = params.get("path", "") encoding = params.get("encoding", "utf-8") if not path: return ToolResult(tool="read", success=False, output="", error="Empty path") is_valid, result = self.validator.validate_path(path) if not is_valid: return ToolResult(tool="read", success=False, output="", error=result) try: content = Path(result).read_text(encoding=encoding) return ToolResult(tool="read", success=True, output=content) except FileNotFoundError: return ToolResult(tool="read", success=False, output="", error=f"File not found: {path}") except Exception as e: return ToolResult(tool="read", success=False, output="", error=str(e)) async def _exec_write(self, params: dict) -> ToolResult: """Write a file within the sandbox.""" path = params.get("path", "") content = params.get("content", "") append = params.get("append", False) if not path: return ToolResult(tool="write", success=False, output="", error="Empty path") is_valid, result = self.validator.validate_path(path) if not is_valid: return ToolResult(tool="write", success=False, output="", error=result) try: file_path = Path(result) file_path.parent.mkdir(parents=True, exist_ok=True) mode = "a" if append else "w" with open(file_path, mode) as f: f.write(content) return ToolResult( tool="write", success=True, output=f"Wrote {len(content)} bytes to {file_path.name}", ) except Exception as e: return ToolResult(tool="write", success=False, output="", error=str(e)) async def _exec_glob(self, params: dict) -> ToolResult: """Find files by glob pattern.""" pattern = params.get("pattern", "") base_dir = params.get("base_dir") if not pattern: return ToolResult(tool="glob", success=False, output="", error="Empty pattern") search_dir = self.working_dir if base_dir: is_valid, result = self.validator.validate_path(base_dir) if is_valid: search_dir = Path(result) try: files = list(search_dir.glob(pattern)) safe_files = [] for f in files[:100]: # Limit results try: f.relative_to(self.working_dir) safe_files.append(str(f.relative_to(self.working_dir))) except ValueError: continue output = "\n".join(sorted(safe_files)) return ToolResult( tool="glob", success=True, output=f"Found {len(safe_files)} files:\n{output}", ) except Exception as e: return ToolResult(tool="glob", success=False, output="", error=str(e)) async def _exec_grep(self, params: dict) -> ToolResult: """Search text in files.""" pattern = params.get("pattern", "") path = params.get("path", "") recursive = params.get("recursive", False) if not pattern or not path: return ToolResult(tool="grep", success=False, output="", error="Pattern or path empty") is_valid, validated_path = self.validator.validate_path(path) if not is_valid: return ToolResult(tool="grep", success=False, output="", error=validated_path) try: cmd = ["grep", "-n", "--color=never"] if recursive: cmd.append("-r") cmd.extend([pattern, validated_path]) process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=str(self.working_dir), ) stdout, _ = await asyncio.wait_for( process.communicate(), timeout=self.timeout, ) return ToolResult(tool="grep", success=True, output=stdout.decode()) except Exception as e: return ToolResult(tool="grep", success=False, output="", error=str(e)) async def _exec_http(self, params: dict) -> ToolResult: """Make an HTTP request.""" url = params.get("url", "") method = params.get("method", "GET").upper() headers = params.get("headers", {}) body = params.get("body", "") timeout = params.get("timeout", self.timeout) if not url: return ToolResult(tool="http_request", success=False, output="", error="Empty URL") try: data = body.encode() if body else None req = urllib.request.Request(url, data=data, method=method) for key, value in headers.items(): req.add_header(key, value) with urllib.request.urlopen(req, timeout=timeout) as response: content = response.read().decode() return ToolResult(tool="http_request", success=True, output=content) except urllib.error.HTTPError as e: return ToolResult( tool="http_request", success=False, output=e.read().decode() if e.fp else "", error=f"HTTP {e.code}: {e.reason}", ) except Exception as e: return ToolResult(tool="http_request", success=False, output="", error=str(e)) async def _exec_ssh(self, params: dict) -> ToolResult: """Execute command via SSH.""" host = params.get("host", "") command = params.get("command", "") user = params.get("user", "root") key_path = params.get("key_path", str(self.ssh_key_path)) port = params.get("port", 22) if not host or not command: return ToolResult(tool="ssh", success=False, output="", error="Host or command empty") # Validate remote command too is_safe, error = self.validator.validate_command(command) if not is_safe: return ToolResult(tool="ssh", success=False, output="", error=f"Unsafe remote command: {error}") try: ssh_cmd = [ "ssh", "-o", f"StrictHostKeyChecking={'yes' if self.ssh_strict_host_checking else 'no'}", "-o", "BatchMode=yes", "-o", "ConnectTimeout=10", "-i", key_path, "-p", str(port), f"{user}@{host}", command, ] process = await asyncio.create_subprocess_exec( *ssh_cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=self.timeout, ) return ToolResult( tool="ssh", success=process.returncode == 0, output=stdout.decode(), error=stderr.decode() if process.returncode != 0 else None, ) except asyncio.TimeoutError: return ToolResult( tool="ssh", success=False, output="", error=f"SSH timeout after {self.timeout}s", ) except Exception as e: return ToolResult(tool="ssh", success=False, output="", error=str(e)) async def _exec_list_dir(self, params: dict) -> ToolResult: """List directory contents.""" path = params.get("path", ".") recursive = params.get("recursive", False) include_hidden = params.get("include_hidden", False) is_valid, validated_path = self.validator.validate_path(path) if not is_valid: return ToolResult(tool="list_dir", success=False, output="", error=validated_path) try: dir_path = Path(validated_path) if not dir_path.exists(): return ToolResult( tool="list_dir", success=False, output="", error=f"Directory not found: {path}", ) entries = [] pattern = "**/*" if recursive else "*" for entry in dir_path.glob(pattern): if not include_hidden and entry.name.startswith("."): continue try: entry.relative_to(self.working_dir) except ValueError: continue entry_type = "d" if entry.is_dir() else "f" rel_path = entry.relative_to(dir_path) entries.append(f"[{entry_type}] {rel_path}") return ToolResult( tool="list_dir", success=True, output="\n".join(sorted(entries)[:200]), ) except Exception as e: return ToolResult(tool="list_dir", success=False, output="", error=str(e)) ``` --- ## orchestrator/agents/\_\_init\_\_.py ```python """ Agent module. """ from .base import Agent, AgentResult __all__ = ["Agent", "AgentResult"] ``` --- ## orchestrator/agents/base.py ```python """ Agent implementation. An Agent combines: - A provider (Claude, LiteLLM, etc.) - Tools - A role/system prompt - Logging """ import asyncio from datetime import datetime from pathlib import Path from typing import Optional, List, Dict, Any from dataclasses import dataclass, field from ..config import get_config, AgentConfig from ..providers import ClaudeProvider, LiteLLMProvider, ProviderResponse from ..tools import ToolExecutor @dataclass class AgentResult: """Result of an agent execution.""" success: bool output: str agent_name: str timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) error: Optional[str] = None tool_results: List[Dict] = field(default_factory=list) usage: Optional[Dict] = None retries: int = 0 class Agent: """ Configurable AI agent. Combines a provider, tools, and a role to create a specialized AI assistant. """ def __init__( self, name: str = "", role: str = "", provider: str = "claude", model: str = "sonnet", tools: Optional[List[str]] = None, servers: Optional[List[str]] = None, max_turns: int = 10, config_obj: Optional[AgentConfig] = None, ): """ Initialize agent from parameters or config object. """ if config_obj: self.name = config_obj.name self.role = config_obj.role self.provider_name = config_obj.provider self.model = config_obj.model self.tools = config_obj.tools or [] self.servers = config_obj.servers or [] self.max_turns = config_obj.max_turns else: self.name = name self.role = role self.provider_name = provider self.model = model self.tools = tools or [] self.servers = servers or [] self.max_turns = max_turns self.config = get_config() # Paths self.log_file = self.config.logs_dir / f"{self.name}.md" self.output_dir = self.config.outputs_dir / self.name # Create provider self.provider = self._create_provider() # Create tool executor for non-native tools self.tool_executor = self._create_tool_executor() # Build system prompt self.system_prompt = self._build_system_prompt() # Initialize self.output_dir.mkdir(parents=True, exist_ok=True) if not self.log_file.exists(): self._init_log() def _create_provider(self): """Create the appropriate provider.""" settings = self.config.settings common_kwargs = { "model": self.model, "timeout": settings.timeout, "max_retries": settings.max_retries, "rate_limit_per_minute": settings.rate_limit_per_minute, "enable_circuit_breaker": settings.enable_circuit_breaker, "circuit_breaker_threshold": settings.circuit_breaker_threshold, "circuit_breaker_timeout": settings.circuit_breaker_timeout, } if self.provider_name == "claude": return ClaudeProvider( max_turns=self.max_turns, working_directory=str(self.config.base_dir), **common_kwargs, ) else: return LiteLLMProvider( max_tool_iterations=self.max_turns, **common_kwargs, ) def _create_tool_executor(self) -> ToolExecutor: """Create tool executor for non-native tools.""" settings = self.config.settings return ToolExecutor( working_dir=str(self.config.base_dir), timeout=settings.timeout, sandbox_paths=settings.sandbox_paths, allowed_commands=settings.allowed_commands or None, ssh_strict_host_checking=settings.ssh_strict_host_checking, rate_limit_per_minute=settings.rate_limit_per_minute, max_retries=settings.max_retries, ) def _build_system_prompt(self) -> str: """Build the system prompt including role and server info.""" prompt = f"# {self.name}\n\n{self.role}" if self.servers: prompt += "\n\n## Available Servers\n" for server_name in self.servers: server = self.config.get_server(server_name) if server: prompt += f"- **{server_name}**: {server.user}@{server.host}" if server.description: prompt += f" ({server.description})" prompt += "\n" return prompt def _init_log(self): """Initialize the log file.""" header = f"""# {self.name} - **Created:** {datetime.now().isoformat()} - **Provider:** {self.provider_name} - **Model:** {self.model} - **Tools:** {', '.join(self.tools) if self.tools else 'none'} --- """ self.log_file.write_text(header) def log(self, entry_type: str, content: str): """Add entry to log file.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") entry = f"\n## {timestamp} | {entry_type}\n\n{content}\n" with open(self.log_file, "a") as f: f.write(entry) def _get_non_native_tools(self) -> List[str]: """Get tools that need manual execution (not native to provider).""" if self.provider_name == "claude": # Claude CLI handles: bash, read, write, edit, glob, grep, webfetch native = {"bash", "read", "write", "edit", "glob", "grep", "webfetch", "web_fetch"} return [t for t in self.tools if t.lower() not in native] else: # LiteLLM doesn't have native tools return self.tools async def run(self, prompt: str, log_action: bool = True) -> AgentResult: """ Execute a prompt through this agent. """ if log_action: self.log("PROMPT", prompt[:500] + "..." if len(prompt) > 500 else prompt) try: # Determine which tools need manual handling non_native_tools = self._get_non_native_tools() # For Claude provider, pass all tools (it handles native ones) # For LiteLLM, we need to handle tools ourselves if self.provider_name == "claude": response = await self.provider.run_with_tools( prompt=prompt, tools=self.tools, system_prompt=self.system_prompt, ) else: # LiteLLM with tool execution response = await self.provider.run_with_tools( prompt=prompt, tools=self.tools, system_prompt=self.system_prompt, tool_executor=self.tool_executor, ) # Build result result = AgentResult( success=response.success, output=response.text, agent_name=self.name, error=response.error, tool_results=[ { "tool": tr.tool, "success": tr.success, "output": tr.output[:200] if tr.output else "", "error": tr.error, } for tr in response.tool_results ], usage=response.usage.to_dict() if response.usage else None, retries=response.retries, ) if log_action: status = "RESULT" if response.success else "ERROR" content = response.text[:500] if response.success else (response.error or "Unknown error") self.log(status, content) return result except Exception as e: if log_action: self.log("ERROR", str(e)) return AgentResult( success=False, output="", agent_name=self.name, error=str(e), ) async def run_with_context( self, prompt: str, context: str, log_action: bool = True, ) -> AgentResult: """ Execute a prompt with additional context. """ full_prompt = f"{context}\n\n---\n\n{prompt}" return await self.run(full_prompt, log_action) def save_output(self, filename: str, content: str) -> Path: """Save content to the agent's output directory.""" filepath = self.output_dir / filename filepath.parent.mkdir(parents=True, exist_ok=True) filepath.write_text(content) self.log("FILE", f"Created: {filepath}") return filepath def read_log(self, last_n: Optional[int] = None) -> str: """Read the agent's log file.""" if not self.log_file.exists(): return "" content = self.log_file.read_text() if last_n: entries = content.split("\n## ") return "\n## ".join(entries[-last_n:]) return content async def health_check(self) -> bool: """Check if the agent's provider is healthy.""" return await self.provider.health_check() def __repr__(self) -> str: return f"Agent({self.name}, {self.provider_name}/{self.model})" ``` --- ## orchestrator/tracking/\_\_init\_\_.py ```python """ Cost tracking module. Track token usage and costs across providers. """ from dataclasses import dataclass, field from datetime import datetime from typing import Dict, List, Optional import json # Cost per million tokens (approximate, may vary) COST_PER_MILLION = { # Claude models "claude-opus-4-20250514": {"input": 15.0, "output": 75.0}, "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0}, "claude-3-5-haiku-latest": {"input": 0.25, "output": 1.25}, # OpenAI models "gpt-4o": {"input": 2.5, "output": 10.0}, "gpt-4o-mini": {"input": 0.15, "output": 0.6}, "gpt-4-turbo": {"input": 10.0, "output": 30.0}, "o1-preview": {"input": 15.0, "output": 60.0}, "o1-mini": {"input": 3.0, "output": 12.0}, # Google models "gemini/gemini-1.5-pro": {"input": 1.25, "output": 5.0}, "gemini/gemini-1.5-flash": {"input": 0.075, "output": 0.3}, # Default for unknown models "default": {"input": 1.0, "output": 3.0}, } @dataclass class RequestCost: """Cost information for a single request.""" model: str input_tokens: int output_tokens: int cache_read_tokens: int = 0 cache_creation_tokens: int = 0 timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) @property def total_tokens(self) -> int: return self.input_tokens + self.output_tokens @property def input_cost(self) -> float: """Calculate input cost in USD.""" rates = COST_PER_MILLION.get(self.model, COST_PER_MILLION["default"]) # Cache reads are typically 90% cheaper regular_input = self.input_tokens - self.cache_read_tokens cache_cost = (self.cache_read_tokens / 1_000_000) * rates["input"] * 0.1 regular_cost = (regular_input / 1_000_000) * rates["input"] return regular_cost + cache_cost @property def output_cost(self) -> float: """Calculate output cost in USD.""" rates = COST_PER_MILLION.get(self.model, COST_PER_MILLION["default"]) return (self.output_tokens / 1_000_000) * rates["output"] @property def total_cost(self) -> float: """Total cost in USD.""" return self.input_cost + self.output_cost @dataclass class CostSummary: """Summary of costs over multiple requests.""" total_requests: int = 0 total_input_tokens: int = 0 total_output_tokens: int = 0 total_cache_read_tokens: int = 0 total_cost_usd: float = 0.0 by_model: Dict[str, Dict] = field(default_factory=dict) def to_dict(self) -> dict: return { "total_requests": self.total_requests, "total_input_tokens": self.total_input_tokens, "total_output_tokens": self.total_output_tokens, "total_tokens": self.total_input_tokens + self.total_output_tokens, "total_cache_read_tokens": self.total_cache_read_tokens, "total_cost_usd": round(self.total_cost_usd, 6), "by_model": self.by_model, } class CostTracker: """ Track costs across multiple requests. Usage: tracker = CostTracker(budget_limit=10.0) # Record usage tracker.record(model="claude-sonnet-4-20250514", input_tokens=1000, output_tokens=500) # Check status print(tracker.summary) print(tracker.remaining_budget) """ def __init__( self, budget_limit: Optional[float] = None, on_budget_warning: Optional[callable] = None, warning_threshold: float = 0.8, ): """ Args: budget_limit: Optional budget limit in USD on_budget_warning: Callback when approaching budget warning_threshold: Fraction of budget to trigger warning (0.8 = 80%) """ self.budget_limit = budget_limit self.on_budget_warning = on_budget_warning self.warning_threshold = warning_threshold self._requests: List[RequestCost] = [] self._warning_sent = False def record( self, model: str, input_tokens: int, output_tokens: int, cache_read_tokens: int = 0, cache_creation_tokens: int = 0, ) -> RequestCost: """Record a request's token usage.""" cost = RequestCost( model=model, input_tokens=input_tokens, output_tokens=output_tokens, cache_read_tokens=cache_read_tokens, cache_creation_tokens=cache_creation_tokens, ) self._requests.append(cost) # Check budget if self.budget_limit and not self._warning_sent: if self.total_cost >= self.budget_limit * self.warning_threshold: self._warning_sent = True if self.on_budget_warning: self.on_budget_warning(self.total_cost, self.budget_limit) return cost def record_from_response(self, response) -> Optional[RequestCost]: """Record from a ProviderResponse object.""" if not response.usage: return None return self.record( model=response.model, input_tokens=response.usage.input_tokens, output_tokens=response.usage.output_tokens, cache_read_tokens=getattr(response.usage, 'cache_read_tokens', 0), cache_creation_tokens=getattr(response.usage, 'cache_creation_tokens', 0), ) @property def total_cost(self) -> float: """Total cost across all requests.""" return sum(r.total_cost for r in self._requests) @property def total_tokens(self) -> int: """Total tokens across all requests.""" return sum(r.total_tokens for r in self._requests) @property def remaining_budget(self) -> Optional[float]: """Remaining budget in USD, or None if no limit.""" if self.budget_limit is None: return None return max(0, self.budget_limit - self.total_cost) @property def is_over_budget(self) -> bool: """Check if over budget.""" if self.budget_limit is None: return False return self.total_cost >= self.budget_limit @property def summary(self) -> CostSummary: """Get cost summary.""" summary = CostSummary() for request in self._requests: summary.total_requests += 1 summary.total_input_tokens += request.input_tokens summary.total_output_tokens += request.output_tokens summary.total_cache_read_tokens += request.cache_read_tokens summary.total_cost_usd += request.total_cost # By model if request.model not in summary.by_model: summary.by_model[request.model] = { "requests": 0, "input_tokens": 0, "output_tokens": 0, "cost_usd": 0.0, } summary.by_model[request.model]["requests"] += 1 summary.by_model[request.model]["input_tokens"] += request.input_tokens summary.by_model[request.model]["output_tokens"] += request.output_tokens summary.by_model[request.model]["cost_usd"] += request.total_cost return summary def reset(self): """Reset all tracked costs.""" self._requests = [] self._warning_sent = False def export(self) -> str: """Export cost data as JSON.""" return json.dumps({ "requests": [ { "model": r.model, "input_tokens": r.input_tokens, "output_tokens": r.output_tokens, "cache_read_tokens": r.cache_read_tokens, "total_cost": r.total_cost, "timestamp": r.timestamp, } for r in self._requests ], "summary": self.summary.to_dict(), "budget_limit": self.budget_limit, }, indent=2) ``` --- ## Ejemplos de Configuración ### examples/simple.yaml ```yaml # Simple single agent configuration settings: default_provider: claude default_model: sonnet agents: assistant: role: "You are a helpful assistant." provider: claude model: sonnet tools: - bash - read - write ``` ### examples/dev_team.yaml ```yaml # Software development team settings: default_provider: claude default_model: sonnet timeout: 300 sandbox_paths: true agents: architect: role: | You are a senior software architect. You design scalable and maintainable systems. You make important technical decisions. You document decisions in ADRs (Architecture Decision Records). provider: claude model: opus tools: - read - write - list_dir - glob developer: role: | You are an experienced full-stack developer. You write clean, well-documented, and testable code. You follow language best practices. You always include appropriate error handling. provider: claude model: sonnet tools: - read - write - bash - grep - glob reviewer: role: | You are a demanding but constructive code reviewer. You look for bugs, security issues, and improvements. You suggest refactoring when needed. You validate that code follows standards. provider: litellm model: gpt4o tools: - read - grep - glob tester: role: | You are a QA engineer specialized in testing. You write unit, integration, and e2e tests. You identify edge cases and error scenarios. You ensure good test coverage. provider: litellm model: gemini-pro tools: - read - write - bash ``` ### examples/devops.yaml ```yaml # DevOps team with server access settings: default_provider: claude default_model: sonnet timeout: 300 ssh_strict_host_checking: true servers: production: host: prod.example.com user: deploy key: ~/.ssh/deploy_key description: "Production server" staging: host: staging.example.com user: deploy key: ~/.ssh/deploy_key description: "Staging server" agents: deployer: role: | You are a deployment specialist. You deploy applications safely to production. You always verify deployments succeed. You rollback immediately if issues are detected. provider: claude model: sonnet tools: - ssh - bash - read servers: - production - staging monitor: role: | You are a systems monitoring expert. You check server health and metrics. You identify issues before they become problems. provider: claude model: haiku tools: - ssh - http_request - read servers: - production - staging ``` ### examples/local_ollama.yaml ```yaml # Using local Ollama models (no API costs!) # # Requirements: # 1. Install Ollama: https://ollama.ai # 2. Pull models: ollama pull llama3 # 3. pip install litellm settings: default_provider: litellm default_model: llama3 timeout: 120 max_tool_iterations: 15 agents: assistant: role: | You are a helpful assistant running locally. You can execute commands and work with files. provider: litellm model: llama3 tools: - bash - read - write - list_dir coder: role: | You are a coding assistant specialized in programming. You write clean, efficient code. provider: litellm model: codellama tools: - read - write - bash - grep ``` --- ## .env.example ```bash # .env.example - Copy to .env and fill in your values # API Keys OPENAI_API_KEY=sk-... GOOGLE_API_KEY=... ANTHROPIC_API_KEY=... MISTRAL_API_KEY=... GROQ_API_KEY=... # SSH Configuration SSH_KEY_PATH=~/.ssh/id_rsa SSH_KNOWN_HOSTS_CHECK=true ``` --- ## .gitignore ``` # Environment .env .venv/ venv/ # Python __pycache__/ *.py[cod] *.egg-info/ # IDE .idea/ .vscode/ # Logs and outputs logs/*.md !logs/.gitkeep outputs/** !outputs/.gitkeep # Secrets *.pem *.key ``` --- ## Uso Rápido ```bash # 1. Crear estructura de directorios mkdir -p tzzr_v6/orchestrator/{core,providers,tools,agents,tracking} mkdir -p tzzr_v6/{logs,outputs,examples} # 2. Copiar cada archivo del código de arriba a su ubicación correspondiente # 3. Instalar dependencias pip install pyyaml pip install litellm # opcional, para GPT-4/Gemini/etc # 4. Ejecutar cd tzzr_v6 python main.py ```