Files
system-docs/02_COMPONENTES/servicios externos/orchestratorv6.md
ARCHITECT 6ea70bd34f Update to Skynet v7 - Complete documentation restructure
- Nueva estructura de carpetas según Skynet v7
- Añadidos schemas SQL completos
- Documentación de entidades, componentes e integraciones
- Modelo de seguridad actualizado
- Infraestructura y operaciones reorganizadas
2025-12-29 18:23:41 +00:00

4093 lines
122 KiB
Markdown

# TZZR Orchestrator v6 - Código Completo
Sistema de orquestación multi-agente robusto y flexible.
## Índice
1. [Estructura del Proyecto](#estructura-del-proyecto)
2. [main.py](#mainpy)
3. [config.yaml](#configyaml)
4. [orchestrator/\_\_init\_\_.py](#orchestrator__init__py)
5. [orchestrator/config.py](#orchestratorconfigpy)
6. [orchestrator/core/\_\_init\_\_.py](#orchestratorcore__init__py)
7. [orchestrator/core/retry.py](#orchestratorcoreretrypy)
8. [orchestrator/core/circuit_breaker.py](#orchestratorcorecircuit_breakerpy)
9. [orchestrator/core/rate_limiter.py](#orchestratorcorerate_limiterpy)
10. [orchestrator/providers/\_\_init\_\_.py](#orchestratorproviders__init__py)
11. [orchestrator/providers/base.py](#orchestratorprovidersbasepy)
12. [orchestrator/providers/claude_provider.py](#orchestratorprovidersclaude_providerpy)
13. [orchestrator/providers/litellm_provider.py](#orchestratorproviderslitellm_providerpy)
14. [orchestrator/tools/\_\_init\_\_.py](#orchestratortools__init__py)
15. [orchestrator/tools/definitions.py](#orchestratortoolsdefinitionspy)
16. [orchestrator/tools/executor.py](#orchestratortoolsexecutorpy)
17. [orchestrator/agents/\_\_init\_\_.py](#orchestratoragents__init__py)
18. [orchestrator/agents/base.py](#orchestratoragentsbasepy)
19. [orchestrator/tracking/\_\_init\_\_.py](#orchestratortracking__init__py)
20. [Ejemplos de Configuración](#ejemplos-de-configuración)
---
## Estructura del Proyecto
```
tzzr_v6/
├── main.py # Punto de entrada
├── config.yaml # Configuración principal
├── .env # Variables de entorno (crear desde .env.example)
├── .env.example # Ejemplo de variables
├── .gitignore
├── README.md
├── orchestrator/
│ ├── __init__.py
│ ├── config.py # Gestión de configuración
│ ├── core/ # Componentes de resiliencia
│ │ ├── __init__.py
│ │ ├── circuit_breaker.py
│ │ ├── rate_limiter.py
│ │ └── retry.py
│ ├── providers/ # Proveedores de LLM
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── claude_provider.py
│ │ └── litellm_provider.py
│ ├── tools/ # Herramientas
│ │ ├── __init__.py
│ │ ├── definitions.py
│ │ └── executor.py
│ ├── agents/ # Agentes
│ │ ├── __init__.py
│ │ └── base.py
│ └── tracking/ # Cost tracking
│ └── __init__.py
├── logs/ # Logs por agente
├── outputs/ # Archivos generados
└── examples/ # Ejemplos de configuración
├── simple.yaml
├── dev_team.yaml
├── devops.yaml
└── local_ollama.yaml
```
---
## main.py
```python
#!/usr/bin/env python3
"""
LLM Orchestrator - Multi-agent orchestration system.
Usage:
python main.py # Interactive mode
python main.py --status # Show status
python main.py --agents # List agents
python main.py --agent X --prompt "Y" # Execute prompt
python main.py --health # Health check
"""
import asyncio
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path
# Add orchestrator to path
sys.path.insert(0, str(Path(__file__).parent))
from orchestrator.config import get_config, reload_config
from orchestrator.agents import Agent, AgentResult
from orchestrator.tracking import CostTracker
class Orchestrator:
"""Main orchestrator class."""
def __init__(self, config_path: str = None):
print("🚀 Starting Orchestrator...")
self.config = get_config(config_path)
self.agents: dict[str, Agent] = {}
self.cost_tracker = CostTracker()
# Load agents
for name, agent_config in self.config.agents.items():
try:
self.agents[name] = Agent(config_obj=agent_config)
except Exception as e:
print(f"⚠️ Error creating agent {name}: {e}")
if self.agents:
print(f"✅ Loaded {len(self.agents)} agent(s): {', '.join(self.agents.keys())}")
else:
print("⚠️ No agents loaded. Edit config.yaml")
async def run_agent(self, agent_name: str, prompt: str) -> AgentResult:
"""Run a single agent."""
if agent_name not in self.agents:
return AgentResult(
success=False,
output="",
agent_name=agent_name,
error=f"Agent '{agent_name}' not found",
)
result = await self.agents[agent_name].run(prompt)
# Track costs
if result.usage:
self.cost_tracker.record(
model=self.agents[agent_name].model,
input_tokens=result.usage.get("input_tokens", 0),
output_tokens=result.usage.get("output_tokens", 0),
)
return result
async def run_all(self, prompt: str) -> dict[str, AgentResult]:
"""Run all agents in parallel."""
tasks = {
name: agent.run(prompt)
for name, agent in self.agents.items()
}
results_list = await asyncio.gather(*tasks.values(), return_exceptions=True)
results = {}
for name, result in zip(tasks.keys(), results_list):
if isinstance(result, Exception):
results[name] = AgentResult(
success=False,
output="",
agent_name=name,
error=str(result),
)
else:
results[name] = result
return results
async def health_check(self) -> dict[str, bool]:
"""Check health of all agents."""
results = {}
for name, agent in self.agents.items():
try:
results[name] = await agent.health_check()
except Exception:
results[name] = False
return results
def get_status(self) -> dict:
"""Get orchestrator status."""
return {
"config_path": str(self.config.config_path) if self.config.config_path else None,
"settings": {
"timeout": self.config.settings.timeout,
"rate_limit": self.config.settings.rate_limit_per_minute,
"max_retries": self.config.settings.max_retries,
"sandbox_paths": self.config.settings.sandbox_paths,
"circuit_breaker": self.config.settings.enable_circuit_breaker,
},
"agents": {
name: {
"provider": agent.provider_name,
"model": agent.model,
"tools": agent.tools,
}
for name, agent in self.agents.items()
},
"servers": list(self.config.servers.keys()),
"costs": self.cost_tracker.summary.to_dict(),
}
async def interactive(self):
"""Run interactive mode."""
print("\n" + "=" * 60)
print("LLM Orchestrator - Interactive Mode")
print("=" * 60)
print("\nCommands:")
print(" /status - Show status")
print(" /agents - List agents")
print(" /agent <n> - Switch active agent")
print(" /logs <n> - Show agent logs")
print(" /costs - Show cost tracking")
print(" /health - Health check")
print(" /reload - Reload configuration")
print(" /all - Run on all agents (parallel)")
print(" /quit - Exit")
print("-" * 60)
if not self.agents:
print("\n⚠️ No agents available. Edit config.yaml")
return
current_agent = list(self.agents.keys())[0]
while True:
try:
prompt = input(f"\n[{current_agent}] > ").strip()
if not prompt:
continue
if prompt == "/quit":
print("Goodbye!")
break
elif prompt == "/status":
print(json.dumps(self.get_status(), indent=2))
elif prompt == "/agents":
for name, agent in self.agents.items():
marker = "" if name == current_agent else " "
print(f" {marker} {name}: {agent.provider_name}/{agent.model}")
if agent.tools:
print(f" tools: {', '.join(agent.tools)}")
elif prompt.startswith("/agent "):
name = prompt[7:].strip()
if name in self.agents:
current_agent = name
print(f"Active agent: {current_agent}")
else:
print(f"Agent '{name}' not found. Available: {', '.join(self.agents.keys())}")
elif prompt.startswith("/logs "):
name = prompt[6:].strip()
if name in self.agents:
log = self.agents[name].read_log(last_n=5)
print(log if log else "(empty)")
else:
print(f"Agent '{name}' not found")
elif prompt == "/costs":
summary = self.cost_tracker.summary
print(f"\n📊 Cost Summary:")
print(f" Requests: {summary.total_requests}")
print(f" Tokens: {summary.total_input_tokens:,} in / {summary.total_output_tokens:,} out")
print(f" Cost: ${summary.total_cost_usd:.4f}")
if summary.by_model:
print("\n By model:")
for model, data in summary.by_model.items():
print(f" {model}: {data['requests']} req, ${data['cost_usd']:.4f}")
elif prompt == "/health":
print("Checking health...")
results = await self.health_check()
for name, healthy in results.items():
status = "" if healthy else ""
print(f" {status} {name}")
elif prompt == "/reload":
self.config = reload_config()
self.agents = {}
for name, agent_config in self.config.agents.items():
try:
self.agents[name] = Agent(config_obj=agent_config)
except Exception as e:
print(f"⚠️ Error: {name}: {e}")
print(f"✅ Reloaded: {len(self.agents)} agent(s)")
if self.agents:
current_agent = list(self.agents.keys())[0]
elif prompt == "/all":
user_prompt = input("Prompt: ").strip()
if user_prompt:
print("Running on all agents...")
results = await self.run_all(user_prompt)
for name, result in results.items():
status = "" if result.success else ""
output = result.output[:300] + "..." if len(result.output) > 300 else result.output
print(f"\n{status} {name}:\n{output or result.error}")
elif prompt.startswith("/"):
print(f"Unknown command: {prompt}")
else:
print("Running...")
result = await self.run_agent(current_agent, prompt)
if result.success:
print(f"\n{result.output}")
if result.tool_results:
print(f"\n[Tools used: {len(result.tool_results)}]")
else:
print(f"\n{result.error}")
except KeyboardInterrupt:
print("\n\nUse /quit to exit.")
except Exception as e:
print(f"\n❌ Error: {e}")
async def main():
parser = argparse.ArgumentParser(description="LLM Orchestrator")
parser.add_argument("--config", type=str, help="Path to config.yaml")
parser.add_argument("--status", action="store_true", help="Show status")
parser.add_argument("--agents", action="store_true", help="List agents")
parser.add_argument("--agent", type=str, help="Agent to use")
parser.add_argument("--prompt", type=str, help="Prompt to execute")
parser.add_argument("--health", action="store_true", help="Health check")
args = parser.parse_args()
orchestrator = Orchestrator(args.config)
if args.status:
print(json.dumps(orchestrator.get_status(), indent=2))
elif args.agents:
for name, agent in orchestrator.agents.items():
print(f" {name}: {agent.provider_name}/{agent.model}")
elif args.health:
results = await orchestrator.health_check()
for name, healthy in results.items():
status = "" if healthy else ""
print(f" {status} {name}")
elif args.agent and args.prompt:
result = await orchestrator.run_agent(args.agent, args.prompt)
if result.success:
print(result.output)
else:
print(f"Error: {result.error}", file=sys.stderr)
sys.exit(1)
else:
await orchestrator.interactive()
if __name__ == "__main__":
asyncio.run(main())
```
---
## config.yaml
```yaml
# config.yaml - LLM Orchestrator Configuration
#
# Edit this file to define your agents and servers.
# ============================================================================
# SETTINGS
# ============================================================================
settings:
# Default provider/model if not specified per agent
default_provider: claude
default_model: sonnet
# Timeout in seconds
timeout: 300
# Working directory (relative to this file)
working_dir: .
# Max tool iterations per turn
max_tool_iterations: 10
# Security
sandbox_paths: true # Restrict file access to working_dir
ssh_strict_host_checking: true # Verify SSH hosts
# allowed_commands: [] # Whitelist commands (empty = all allowed)
# Rate limiting
rate_limit_per_minute: 60
# Retry
max_retries: 3
retry_delay: 1.0
# Circuit breaker (protects against cascading failures)
enable_circuit_breaker: true
circuit_breaker_threshold: 5 # Failures before opening
circuit_breaker_timeout: 30.0 # Seconds before retry
# ============================================================================
# SERVERS (optional)
# ============================================================================
# Define servers for SSH access
servers:
# Example:
# production:
# host: 192.168.1.100
# user: deploy
# key: ~/.ssh/id_rsa
# port: 22
# description: "Production server"
# ============================================================================
# AGENTS
# ============================================================================
# Define your AI agents
agents:
# Default assistant
assistant:
role: |
You are a helpful assistant that can execute commands,
read and write files, and help with various tasks.
provider: claude
model: sonnet
tools:
- bash
- read
- write
- list_dir
- glob
# Coder agent (uncomment to enable)
# coder:
# role: |
# You are an expert programmer.
# You write clean, well-documented, and testable code.
# You follow best practices and include error handling.
# provider: claude
# model: sonnet
# tools:
# - read
# - write
# - bash
# - grep
# - glob
# ============================================================================
# TASKS (optional)
# ============================================================================
# Define automated task sequences
tasks:
# Example:
# deploy:
# description: "Deploy application to production"
# steps:
# - agent: coder
# prompt: "Run tests"
# - agent: deployer
# prompt: "Deploy to production"
```
---
## orchestrator/\_\_init\_\_.py
```python
"""
Orchestrator module.
"""
from .config import Config, get_config, reload_config, AgentConfig, ServerConfig, Settings
from .agents import Agent, AgentResult
__all__ = [
"Config",
"get_config",
"reload_config",
"AgentConfig",
"ServerConfig",
"Settings",
"Agent",
"AgentResult",
]
```
---
## orchestrator/config.py
```python
"""
Configuration management.
Loads configuration from:
1. Environment variables
2. .env file
3. config.yaml
NEVER hardcode credentials.
"""
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
def load_env():
"""Load variables from .env file if it exists."""
env_paths = [
Path.cwd() / ".env",
Path(__file__).parent.parent / ".env",
]
for env_file in env_paths:
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and value:
os.environ.setdefault(key, value)
break
# Load .env on import
load_env()
def get_env(key: str, default: str = "") -> str:
"""Get environment variable."""
return os.environ.get(key, default)
def get_env_bool(key: str, default: bool = False) -> bool:
"""Get environment variable as boolean."""
val = os.environ.get(key, "").lower()
if val in ("true", "yes", "1"):
return True
if val in ("false", "no", "0"):
return False
return default
def get_env_list(key: str, default: Optional[List] = None) -> List:
"""Get environment variable as list."""
val = os.environ.get(key, "")
if val:
return [x.strip() for x in val.split(",") if x.strip()]
return default or []
@dataclass
class ServerConfig:
"""Server configuration for SSH."""
name: str
host: str
user: str = "root"
key: str = ""
port: int = 22
description: str = ""
def __post_init__(self):
if not self.key:
self.key = get_env("SSH_KEY_PATH", "~/.ssh/id_rsa")
@dataclass
class AgentConfig:
"""Agent configuration."""
name: str
role: str
provider: str = "claude"
model: str = "sonnet"
tools: List[str] = field(default_factory=list)
servers: List[str] = field(default_factory=list)
max_turns: int = 10
@dataclass
class Settings:
"""General settings."""
default_provider: str = "claude"
default_model: str = "sonnet"
timeout: float = 300.0
working_dir: str = "."
max_tool_iterations: int = 10
# Security
ssh_strict_host_checking: bool = True
sandbox_paths: bool = True
allowed_commands: List[str] = field(default_factory=list)
# Rate limiting
rate_limit_per_minute: int = 60
# Retry
max_retries: int = 3
retry_delay: float = 1.0
# Circuit breaker
enable_circuit_breaker: bool = True
circuit_breaker_threshold: int = 5
circuit_breaker_timeout: float = 30.0
class Config:
"""Main configuration manager."""
def __init__(self, config_path: Optional[str] = None):
self.config_path = self._find_config(config_path)
self.base_dir = self.config_path.parent if self.config_path else Path.cwd()
self._raw = self._load_yaml() if self.config_path else {}
# Parse configuration
self.settings = self._parse_settings()
self.servers = self._parse_servers()
self.agents = self._parse_agents()
self.tasks = self._raw.get("tasks", {})
# Directories
self.logs_dir = self.base_dir / "logs"
self.outputs_dir = self.base_dir / "outputs"
# Create directories
self.logs_dir.mkdir(exist_ok=True)
self.outputs_dir.mkdir(exist_ok=True)
def _find_config(self, config_path: Optional[str]) -> Optional[Path]:
"""Find configuration file."""
if config_path:
path = Path(config_path)
if path.exists():
return path
raise FileNotFoundError(f"Config not found: {config_path}")
search_paths = [
Path.cwd() / "config.yaml",
Path.cwd() / "config.yml",
Path(__file__).parent.parent / "config.yaml",
]
for path in search_paths:
if path.exists():
return path
return None
def _load_yaml(self) -> dict:
"""Load YAML file."""
if not self.config_path:
return {}
try:
import yaml
with open(self.config_path) as f:
return yaml.safe_load(f) or {}
except ImportError:
print("WARNING: PyYAML not installed. Run: pip install pyyaml")
return {}
def _parse_settings(self) -> Settings:
"""Parse settings section."""
raw = self._raw.get("settings", {})
return Settings(
default_provider=raw.get("default_provider", "claude"),
default_model=raw.get("default_model", "sonnet"),
timeout=float(raw.get("timeout", 300)),
working_dir=raw.get("working_dir", "."),
max_tool_iterations=int(raw.get("max_tool_iterations", 10)),
ssh_strict_host_checking=raw.get(
"ssh_strict_host_checking",
get_env_bool("SSH_KNOWN_HOSTS_CHECK", True)
),
sandbox_paths=raw.get("sandbox_paths", True),
allowed_commands=raw.get("allowed_commands", []),
rate_limit_per_minute=int(raw.get("rate_limit_per_minute", 60)),
max_retries=int(raw.get("max_retries", 3)),
retry_delay=float(raw.get("retry_delay", 1.0)),
enable_circuit_breaker=raw.get("enable_circuit_breaker", True),
circuit_breaker_threshold=int(raw.get("circuit_breaker_threshold", 5)),
circuit_breaker_timeout=float(raw.get("circuit_breaker_timeout", 30.0)),
)
def _parse_servers(self) -> Dict[str, ServerConfig]:
"""Parse servers section."""
servers = {}
for name, data in self._raw.get("servers", {}).items():
if data:
servers[name] = ServerConfig(
name=name,
host=data.get("host", ""),
user=data.get("user", "root"),
key=data.get("key", get_env("SSH_KEY_PATH", "~/.ssh/id_rsa")),
port=int(data.get("port", 22)),
description=data.get("description", ""),
)
return servers
def _parse_agents(self) -> Dict[str, AgentConfig]:
"""Parse agents section."""
agents = {}
for name, data in self._raw.get("agents", {}).items():
if data:
agents[name] = AgentConfig(
name=name,
role=data.get("role", ""),
provider=data.get("provider", self.settings.default_provider),
model=data.get("model", self.settings.default_model),
tools=data.get("tools", []),
servers=data.get("servers", []),
max_turns=int(data.get("max_turns", self.settings.max_tool_iterations)),
)
return agents
def get_agent(self, name: str) -> Optional[AgentConfig]:
return self.agents.get(name)
def get_server(self, name: str) -> Optional[ServerConfig]:
return self.servers.get(name)
def list_agents(self) -> List[str]:
return list(self.agents.keys())
def list_servers(self) -> List[str]:
return list(self.servers.keys())
# Global instance
_config: Optional[Config] = None
def get_config(config_path: Optional[str] = None) -> Config:
"""Get global configuration."""
global _config
if _config is None:
_config = Config(config_path)
return _config
def reload_config(config_path: Optional[str] = None) -> Config:
"""Reload configuration."""
global _config
_config = Config(config_path)
return _config
```
---
## orchestrator/core/\_\_init\_\_.py
```python
"""
Core components for resilience and reliability.
"""
from .retry import retry_with_backoff, RetryConfig
from .circuit_breaker import CircuitBreaker, CircuitState
from .rate_limiter import RateLimiter
__all__ = [
"retry_with_backoff",
"RetryConfig",
"CircuitBreaker",
"CircuitState",
"RateLimiter",
]
```
---
## orchestrator/core/retry.py
```python
"""
Retry logic with exponential backoff.
Features:
- Configurable retry attempts
- Exponential backoff with jitter
- Selective exception handling
- Async support
"""
import asyncio
import random
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Optional, Type, Tuple, Any
from functools import wraps
T = TypeVar("T")
@dataclass
class RetryConfig:
"""Configuration for retry behavior."""
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
# Exceptions that should trigger retry
retryable_exceptions: Tuple[Type[Exception], ...] = field(
default_factory=lambda: (
ConnectionError,
TimeoutError,
asyncio.TimeoutError,
)
)
# Exceptions that should NOT retry (fail immediately)
fatal_exceptions: Tuple[Type[Exception], ...] = field(
default_factory=lambda: (
KeyboardInterrupt,
SystemExit,
PermissionError,
)
)
def calculate_delay(self, attempt: int) -> float:
"""Calculate delay for given attempt number."""
delay = min(
self.base_delay * (self.exponential_base ** attempt),
self.max_delay
)
if self.jitter:
# Add random jitter (±25%)
delay = delay * (0.75 + random.random() * 0.5)
return delay
def should_retry(self, exception: Exception) -> bool:
"""Determine if exception should trigger retry."""
if isinstance(exception, self.fatal_exceptions):
return False
if self.retryable_exceptions:
return isinstance(exception, self.retryable_exceptions)
# By default, retry on any non-fatal exception
return True
async def retry_with_backoff(
func: Callable[..., T],
*args,
config: Optional[RetryConfig] = None,
on_retry: Optional[Callable[[int, Exception], None]] = None,
**kwargs
) -> Tuple[T, int]:
"""
Execute async function with retry and exponential backoff.
Args:
func: Async function to execute
*args: Arguments for func
config: Retry configuration
on_retry: Optional callback on each retry (attempt, exception)
**kwargs: Keyword arguments for func
Returns:
Tuple of (result, number_of_retries)
Raises:
Last exception if all retries exhausted
"""
config = config or RetryConfig()
last_exception: Optional[Exception] = None
for attempt in range(config.max_retries + 1):
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
return result, attempt
except Exception as e:
last_exception = e
# Check if we should retry
if not config.should_retry(e):
raise
# Check if we have retries left
if attempt >= config.max_retries:
raise
# Calculate delay and wait
delay = config.calculate_delay(attempt)
# Call retry callback if provided
if on_retry:
on_retry(attempt + 1, e)
await asyncio.sleep(delay)
# Should never reach here, but just in case
if last_exception:
raise last_exception
raise RuntimeError("Retry logic error")
def with_retry(config: Optional[RetryConfig] = None):
"""
Decorator for retry with backoff.
Usage:
@with_retry(RetryConfig(max_retries=5))
async def my_function():
...
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@wraps(func)
async def wrapper(*args, **kwargs) -> T:
result, _ = await retry_with_backoff(func, *args, config=config, **kwargs)
return result
return wrapper
return decorator
```
---
## orchestrator/core/circuit_breaker.py
```python
"""
Circuit Breaker pattern implementation.
Protects against cascading failures by:
- Tracking failure rates
- Opening circuit after threshold
- Auto-recovery with half-open state
"""
import asyncio
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, TypeVar, Optional, Any
T = TypeVar("T")
class CircuitState(Enum):
"""Circuit breaker states."""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitStats:
"""Statistics for circuit breaker."""
total_calls: int = 0
successful_calls: int = 0
failed_calls: int = 0
rejected_calls: int = 0
last_failure_time: Optional[float] = None
last_success_time: Optional[float] = None
@property
def failure_rate(self) -> float:
if self.total_calls == 0:
return 0.0
return self.failed_calls / self.total_calls
class CircuitBreakerError(Exception):
"""Raised when circuit is open."""
pass
class CircuitBreaker:
"""
Circuit breaker for fault tolerance.
States:
- CLOSED: Normal operation, tracking failures
- OPEN: Circuit tripped, rejecting calls immediately
- HALF_OPEN: Testing if service recovered
Usage:
breaker = CircuitBreaker(failure_threshold=5)
async with breaker:
result = await risky_operation()
"""
def __init__(
self,
failure_threshold: int = 5,
success_threshold: int = 2,
timeout: float = 30.0,
half_open_max_calls: int = 3,
):
"""
Args:
failure_threshold: Failures before opening circuit
success_threshold: Successes in half-open to close
timeout: Seconds before trying half-open
half_open_max_calls: Max concurrent calls in half-open
"""
self.failure_threshold = failure_threshold
self.success_threshold = success_threshold
self.timeout = timeout
self.half_open_max_calls = half_open_max_calls
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time: Optional[float] = None
self._half_open_calls = 0
self._lock = asyncio.Lock()
self.stats = CircuitStats()
@property
def state(self) -> CircuitState:
"""Current circuit state."""
return self._state
@property
def is_closed(self) -> bool:
return self._state == CircuitState.CLOSED
@property
def is_open(self) -> bool:
return self._state == CircuitState.OPEN
def _should_attempt_reset(self) -> bool:
"""Check if enough time passed to try half-open."""
if self._last_failure_time is None:
return True
return time.time() - self._last_failure_time >= self.timeout
async def _check_state(self) -> bool:
"""
Check and potentially transition state.
Returns True if call should proceed.
"""
async with self._lock:
if self._state == CircuitState.CLOSED:
return True
if self._state == CircuitState.OPEN:
if self._should_attempt_reset():
self._state = CircuitState.HALF_OPEN
self._success_count = 0
self._half_open_calls = 0
else:
self.stats.rejected_calls += 1
return False
if self._state == CircuitState.HALF_OPEN:
if self._half_open_calls >= self.half_open_max_calls:
self.stats.rejected_calls += 1
return False
self._half_open_calls += 1
return True
async def _record_success(self):
"""Record successful call."""
async with self._lock:
self.stats.total_calls += 1
self.stats.successful_calls += 1
self.stats.last_success_time = time.time()
if self._state == CircuitState.HALF_OPEN:
self._success_count += 1
if self._success_count >= self.success_threshold:
self._state = CircuitState.CLOSED
self._failure_count = 0
elif self._state == CircuitState.CLOSED:
# Reset failure count on success
self._failure_count = 0
async def _record_failure(self, exception: Exception):
"""Record failed call."""
async with self._lock:
self.stats.total_calls += 1
self.stats.failed_calls += 1
self.stats.last_failure_time = time.time()
self._last_failure_time = time.time()
if self._state == CircuitState.HALF_OPEN:
# Any failure in half-open reopens circuit
self._state = CircuitState.OPEN
elif self._state == CircuitState.CLOSED:
self._failure_count += 1
if self._failure_count >= self.failure_threshold:
self._state = CircuitState.OPEN
async def call(
self,
func: Callable[..., T],
*args,
**kwargs
) -> T:
"""
Execute function through circuit breaker.
Raises:
CircuitBreakerError: If circuit is open
"""
if not await self._check_state():
raise CircuitBreakerError(
f"Circuit breaker is {self._state.value}. "
f"Retry after {self.timeout}s"
)
try:
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
await self._record_success()
return result
except Exception as e:
await self._record_failure(e)
raise
async def __aenter__(self):
"""Async context manager entry."""
if not await self._check_state():
raise CircuitBreakerError(
f"Circuit breaker is {self._state.value}"
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if exc_type is None:
await self._record_success()
else:
await self._record_failure(exc_val)
return False # Don't suppress exceptions
def reset(self):
"""Manually reset circuit breaker."""
self._state = CircuitState.CLOSED
self._failure_count = 0
self._success_count = 0
self._last_failure_time = None
self.stats = CircuitStats()
def __repr__(self) -> str:
return (
f"CircuitBreaker(state={self._state.value}, "
f"failures={self._failure_count}/{self.failure_threshold})"
)
```
---
## orchestrator/core/rate_limiter.py
```python
"""
Rate limiter with sliding window.
Features:
- Sliding window algorithm
- Async-safe with lock
- Configurable burst allowance
"""
import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Optional
@dataclass
class RateLimitStats:
"""Statistics for rate limiter."""
total_requests: int = 0
allowed_requests: int = 0
throttled_requests: int = 0
total_wait_time: float = 0.0
class RateLimiter:
"""
Sliding window rate limiter.
Limits calls to max_calls per period seconds.
Usage:
limiter = RateLimiter(max_calls=60, period=60.0)
await limiter.acquire() # Blocks if rate exceeded
do_something()
"""
def __init__(
self,
max_calls: int = 60,
period: float = 60.0,
burst_allowance: int = 0,
):
"""
Args:
max_calls: Maximum calls allowed in period
period: Time window in seconds
burst_allowance: Extra calls allowed for bursts
"""
self.max_calls = max_calls
self.period = period
self.burst_allowance = burst_allowance
self._calls: deque = deque()
self._lock = asyncio.Lock()
self.stats = RateLimitStats()
@property
def effective_limit(self) -> int:
"""Effective rate limit including burst."""
return self.max_calls + self.burst_allowance
def _cleanup_old_calls(self, now: float):
"""Remove calls outside the window."""
cutoff = now - self.period
while self._calls and self._calls[0] < cutoff:
self._calls.popleft()
@property
def current_usage(self) -> int:
"""Current number of calls in window."""
self._cleanup_old_calls(time.time())
return len(self._calls)
@property
def available_calls(self) -> int:
"""Number of calls available right now."""
return max(0, self.effective_limit - self.current_usage)
async def acquire(self, timeout: Optional[float] = None) -> bool:
"""
Acquire permission to make a call.
Blocks until rate limit allows the call.
Args:
timeout: Maximum time to wait (None = unlimited)
Returns:
True if acquired, False if timeout
"""
start_time = time.time()
async with self._lock:
self.stats.total_requests += 1
while True:
now = time.time()
self._cleanup_old_calls(now)
# Check if we can proceed
if len(self._calls) < self.effective_limit:
self._calls.append(now)
self.stats.allowed_requests += 1
return True
# Calculate wait time
oldest_call = self._calls[0]
wait_time = oldest_call + self.period - now
if wait_time <= 0:
continue
# Check timeout
if timeout is not None:
elapsed = now - start_time
if elapsed + wait_time > timeout:
self.stats.throttled_requests += 1
return False
# Wait and retry
self.stats.total_wait_time += wait_time
# Release lock while waiting
self._lock.release()
try:
await asyncio.sleep(wait_time)
finally:
await self._lock.acquire()
async def try_acquire(self) -> bool:
"""
Try to acquire without waiting.
Returns:
True if acquired, False if would need to wait
"""
async with self._lock:
self.stats.total_requests += 1
now = time.time()
self._cleanup_old_calls(now)
if len(self._calls) < self.effective_limit:
self._calls.append(now)
self.stats.allowed_requests += 1
return True
self.stats.throttled_requests += 1
return False
def reset(self):
"""Reset rate limiter state."""
self._calls.clear()
self.stats = RateLimitStats()
def __repr__(self) -> str:
return (
f"RateLimiter({self.current_usage}/{self.effective_limit} "
f"per {self.period}s)"
)
class MultiRateLimiter:
"""
Multiple rate limiters combined.
Useful for APIs with multiple rate limits:
- 60 requests per minute
- 1000 requests per hour
Usage:
limiter = MultiRateLimiter([
RateLimiter(60, 60), # per minute
RateLimiter(1000, 3600), # per hour
])
await limiter.acquire()
"""
def __init__(self, limiters: list[RateLimiter]):
self.limiters = limiters
async def acquire(self, timeout: Optional[float] = None) -> bool:
"""Acquire from all limiters."""
for limiter in self.limiters:
if not await limiter.acquire(timeout):
return False
return True
async def try_acquire(self) -> bool:
"""Try to acquire from all limiters without waiting."""
for limiter in self.limiters:
if not await limiter.try_acquire():
return False
return True
```
---
## orchestrator/providers/\_\_init\_\_.py
```python
"""
LLM Providers.
"""
from .base import BaseProvider, ProviderResponse
from .claude_provider import ClaudeProvider
from .litellm_provider import LiteLLMProvider
__all__ = [
"BaseProvider",
"ProviderResponse",
"ClaudeProvider",
"LiteLLMProvider",
]
```
---
## orchestrator/providers/base.py
```python
"""
Base provider with resilience features.
All providers inherit from this and get:
- Rate limiting
- Circuit breaker
- Retry with backoff
- Consistent response format
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Any, List
from ..core import (
CircuitBreaker,
RateLimiter,
RetryConfig,
retry_with_backoff,
)
@dataclass
class ToolCall:
"""A tool call from the LLM."""
tool: str
params: dict
id: Optional[str] = None
@dataclass
class ToolResult:
"""Result of executing a tool."""
tool: str
success: bool
output: str
error: Optional[str] = None
execution_time: float = 0.0
@dataclass
class UsageInfo:
"""Token usage information."""
input_tokens: int = 0
output_tokens: int = 0
cache_read_tokens: int = 0
cache_creation_tokens: int = 0
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
@dataclass
class ProviderResponse:
"""Standardized response from any provider."""
success: bool
text: str
provider: str
model: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
error: Optional[str] = None
usage: Optional[UsageInfo] = None
tool_calls: List[ToolCall] = field(default_factory=list)
tool_results: List[ToolResult] = field(default_factory=list)
retries: int = 0
raw_response: Optional[Any] = None
session_id: Optional[str] = None
@property
def ok(self) -> bool:
return self.success
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"success": self.success,
"text": self.text,
"provider": self.provider,
"model": self.model,
"timestamp": self.timestamp,
"error": self.error,
"usage": {
"input_tokens": self.usage.input_tokens,
"output_tokens": self.usage.output_tokens,
"total_tokens": self.usage.total_tokens,
} if self.usage else None,
"retries": self.retries,
"session_id": self.session_id,
}
class BaseProvider(ABC):
"""
Abstract base class for all LLM providers.
Provides:
- Rate limiting
- Circuit breaker for fault tolerance
- Retry with exponential backoff
- Consistent response format
Subclasses must implement:
- name (property)
- available_models (property)
- _execute (method)
"""
# Model aliases - override in subclasses
MODEL_ALIASES: dict[str, str] = {}
def __init__(
self,
model: str,
timeout: float = 300.0,
max_retries: int = 3,
retry_delay: float = 1.0,
rate_limit_per_minute: int = 60,
enable_circuit_breaker: bool = True,
circuit_breaker_threshold: int = 5,
circuit_breaker_timeout: float = 30.0,
**kwargs
):
self.model = model
self.timeout = timeout
self.config = kwargs
# Retry config
self.retry_config = RetryConfig(
max_retries=max_retries,
base_delay=retry_delay,
max_delay=60.0,
)
# Rate limiter
self.rate_limiter = RateLimiter(
max_calls=rate_limit_per_minute,
period=60.0,
)
# Circuit breaker
self.enable_circuit_breaker = enable_circuit_breaker
self.circuit_breaker = CircuitBreaker(
failure_threshold=circuit_breaker_threshold,
timeout=circuit_breaker_timeout,
) if enable_circuit_breaker else None
@property
@abstractmethod
def name(self) -> str:
"""Provider name (e.g., 'claude', 'litellm')."""
pass
@property
@abstractmethod
def available_models(self) -> list[str]:
"""List of available model names/aliases."""
pass
@property
def supports_native_tools(self) -> bool:
"""Whether provider supports native function calling."""
return False
@property
def supports_streaming(self) -> bool:
"""Whether provider supports streaming responses."""
return False
def resolve_model(self, model: str) -> str:
"""Resolve model alias to full model name."""
return self.MODEL_ALIASES.get(model, model)
@abstractmethod
async def _execute(
self,
prompt: str,
system_prompt: Optional[str] = None,
tools: Optional[list[str]] = None,
**kwargs
) -> ProviderResponse:
"""
Internal execution method - implement in subclasses.
This is the raw execution without retry/circuit breaker.
"""
pass
async def run(
self,
prompt: str,
system_prompt: Optional[str] = None,
**kwargs
) -> ProviderResponse:
"""
Execute a prompt without tools.
Includes rate limiting, retry, and circuit breaker.
"""
return await self.run_with_tools(
prompt=prompt,
tools=[],
system_prompt=system_prompt,
**kwargs
)
async def run_with_tools(
self,
prompt: str,
tools: list[str],
system_prompt: Optional[str] = None,
**kwargs
) -> ProviderResponse:
"""
Execute a prompt with tools.
Includes rate limiting, retry, and circuit breaker.
"""
# Rate limiting
await self.rate_limiter.acquire()
# Define the execution function
async def do_execute() -> ProviderResponse:
return await self._execute(
prompt=prompt,
system_prompt=system_prompt,
tools=tools,
**kwargs
)
try:
# With circuit breaker
if self.circuit_breaker:
async with self.circuit_breaker:
response, retries = await retry_with_backoff(
do_execute,
config=self.retry_config,
)
else:
response, retries = await retry_with_backoff(
do_execute,
config=self.retry_config,
)
response.retries = retries
return response
except Exception as e:
return ProviderResponse(
success=False,
text="",
provider=self.name,
model=self.model,
error=str(e),
)
async def health_check(self) -> bool:
"""Check if provider is healthy."""
try:
response = await self.run("Respond with only: OK")
return response.success and "OK" in response.text.upper()
except Exception:
return False
def __repr__(self) -> str:
return f"{self.__class__.__name__}(model={self.model})"
```
---
## orchestrator/providers/claude_provider.py
```python
"""
Claude provider using Claude Code CLI.
This provider executes Claude via the CLI, handling:
- Proper JSON parsing
- Session management
- Tool execution
- Error handling
"""
import asyncio
import json
import shutil
import os
from typing import Optional, List, Any
from pathlib import Path
from .base import BaseProvider, ProviderResponse, UsageInfo, ToolCall
class ClaudeProviderError(Exception):
"""Base exception for Claude provider."""
pass
class CLINotFoundError(ClaudeProviderError):
"""Claude CLI not found."""
pass
class ExecutionError(ClaudeProviderError):
"""Error during execution."""
pass
class ClaudeProvider(BaseProvider):
"""
Provider that uses Claude Code CLI.
Requires:
- Claude Code CLI installed (`claude` command available)
- Valid authentication (Pro/Max subscription or API key)
Features:
- Native tool support (Bash, Read, Write, etc.)
- Session management
- JSON output parsing
- Autocompact for long conversations
"""
MODEL_ALIASES = {
# Friendly names
"haiku": "claude-3-5-haiku-latest",
"sonnet": "claude-sonnet-4-20250514",
"opus": "claude-opus-4-20250514",
# Semantic names
"fast": "claude-3-5-haiku-latest",
"default": "claude-sonnet-4-20250514",
"smart": "claude-sonnet-4-20250514",
"powerful": "claude-opus-4-20250514",
}
# Tools supported by Claude CLI
NATIVE_TOOLS = [
"Bash", "Read", "Write", "Edit",
"Glob", "Grep", "WebFetch",
"TodoRead", "TodoWrite",
]
# Mapping from our tool names to Claude CLI tool names
TOOL_NAME_MAP = {
"bash": "Bash",
"read": "Read",
"write": "Write",
"edit": "Edit",
"glob": "Glob",
"grep": "Grep",
"web_fetch": "WebFetch",
"webfetch": "WebFetch",
}
def __init__(
self,
model: str = "sonnet",
timeout: float = 300.0,
cli_path: str = "claude",
working_directory: Optional[str] = None,
max_turns: int = 10,
autocompact: bool = True,
**kwargs
):
"""
Args:
model: Model to use (sonnet, opus, haiku, or full name)
timeout: Execution timeout in seconds
cli_path: Path to claude CLI
working_directory: Working directory for execution
max_turns: Maximum tool use turns
autocompact: Enable automatic context compaction
"""
super().__init__(model=model, timeout=timeout, **kwargs)
self.cli_path = cli_path
self.working_directory = working_directory or os.getcwd()
self.max_turns = max_turns
self.autocompact = autocompact
# Verify CLI exists
if not shutil.which(self.cli_path):
raise CLINotFoundError(
f"Claude CLI not found at '{self.cli_path}'. "
"Install with: npm install -g @anthropic-ai/claude-code"
)
@property
def name(self) -> str:
return "claude"
@property
def available_models(self) -> list[str]:
return list(self.MODEL_ALIASES.keys())
@property
def supports_native_tools(self) -> bool:
return True
def _map_tools(self, tools: List[str]) -> List[str]:
"""Map our tool names to Claude CLI tool names."""
mapped = []
for tool in tools:
tool_lower = tool.lower()
if tool_lower in self.TOOL_NAME_MAP:
mapped.append(self.TOOL_NAME_MAP[tool_lower])
elif tool in self.NATIVE_TOOLS:
mapped.append(tool)
# Skip non-native tools (http_request, ssh, etc.)
return mapped
def _build_command(
self,
prompt: str,
system_prompt: Optional[str] = None,
tools: Optional[List[str]] = None,
session_id: Optional[str] = None,
continue_session: bool = False,
) -> List[str]:
"""Build the CLI command."""
cmd = [self.cli_path]
# Prompt
cmd.extend(["-p", prompt])
# Output format
cmd.extend(["--output-format", "json"])
# Model
resolved_model = self.resolve_model(self.model)
cmd.extend(["--model", resolved_model])
# System prompt
if system_prompt:
cmd.extend(["--system-prompt", system_prompt])
# Tools
if tools:
native_tools = self._map_tools(tools)
if native_tools:
cmd.extend(["--allowedTools", ",".join(native_tools)])
# Max turns
cmd.extend(["--max-turns", str(self.max_turns)])
# Session management
if session_id:
if continue_session:
cmd.extend(["--continue", session_id])
else:
cmd.extend(["--session-id", session_id])
return cmd
def _parse_response(self, stdout: str, stderr: str, return_code: int) -> dict:
"""Parse the CLI response."""
# Try to parse JSON
try:
# Handle potential multiple JSON objects (streaming artifacts)
lines = stdout.strip().split('\n')
# Find the last valid JSON object
for line in reversed(lines):
line = line.strip()
if line.startswith('{'):
try:
return json.loads(line)
except json.JSONDecodeError:
continue
# Try parsing the whole output
return json.loads(stdout)
except json.JSONDecodeError:
# If JSON parsing fails, return raw output
return {
"result": stdout,
"is_error": return_code != 0,
"error": stderr if stderr else None,
}
def _extract_usage(self, response: dict) -> Optional[UsageInfo]:
"""Extract usage information from response."""
usage_data = response.get("usage")
if not usage_data:
return None
return UsageInfo(
input_tokens=usage_data.get("input_tokens", 0),
output_tokens=usage_data.get("output_tokens", 0),
cache_read_tokens=usage_data.get("cache_read_input_tokens", 0),
cache_creation_tokens=usage_data.get("cache_creation_input_tokens", 0),
)
async def _execute(
self,
prompt: str,
system_prompt: Optional[str] = None,
tools: Optional[List[str]] = None,
session_id: Optional[str] = None,
continue_session: bool = False,
**kwargs
) -> ProviderResponse:
"""Execute the Claude CLI."""
cmd = self._build_command(
prompt=prompt,
system_prompt=system_prompt,
tools=tools or [],
session_id=session_id,
continue_session=continue_session,
)
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.working_directory,
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout
)
stdout_str = stdout.decode("utf-8", errors="replace")
stderr_str = stderr.decode("utf-8", errors="replace")
# Parse response
response_data = self._parse_response(
stdout_str, stderr_str, process.returncode
)
# Check for errors
is_error = response_data.get("is_error", False) or process.returncode != 0
# Extract text
text = response_data.get("result", "")
if not text and "content" in response_data:
# Handle different response formats
content = response_data["content"]
if isinstance(content, list):
text = "\n".join(
c.get("text", "") for c in content
if c.get("type") == "text"
)
elif isinstance(content, str):
text = content
# Extract error
error = None
if is_error:
error = response_data.get("error") or stderr_str or "Unknown error"
return ProviderResponse(
success=not is_error,
text=text,
provider=self.name,
model=self.model,
error=error,
usage=self._extract_usage(response_data),
session_id=response_data.get("session_id"),
raw_response=response_data,
)
except asyncio.TimeoutError:
raise ExecutionError(f"Timeout after {self.timeout}s")
except FileNotFoundError:
raise CLINotFoundError(f"Claude CLI not found: {self.cli_path}")
except Exception as e:
raise ExecutionError(f"Execution failed: {e}")
async def run_with_session(
self,
prompt: str,
session_id: str,
continue_session: bool = True,
**kwargs
) -> ProviderResponse:
"""Run with explicit session management."""
return await self.run_with_tools(
prompt=prompt,
tools=kwargs.pop("tools", []),
system_prompt=kwargs.pop("system_prompt", None),
session_id=session_id,
continue_session=continue_session,
**kwargs
)
async def health_check(self) -> bool:
"""Verify Claude CLI is working."""
try:
# Simple test command
cmd = [self.cli_path, "-p", "Say OK", "--output-format", "json", "--max-turns", "1"]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _ = await asyncio.wait_for(
process.communicate(),
timeout=30.0
)
if process.returncode == 0:
return True
return False
except Exception:
return False
```
---
## orchestrator/providers/litellm_provider.py
```python
"""
Universal LLM provider using LiteLLM.
Supports 100+ models through a unified interface:
- OpenAI (GPT-4, o1)
- Google (Gemini)
- Anthropic (via API)
- Mistral
- Ollama (local)
- Groq
- Together
- And many more...
"""
import asyncio
import json
import re
from typing import Optional, List, Any
from .base import BaseProvider, ProviderResponse, UsageInfo, ToolCall, ToolResult
class LiteLLMProvider(BaseProvider):
"""
Universal provider using LiteLLM.
Requires:
- pip install litellm
- Appropriate API keys in environment
Features:
- 100+ model support
- Unified API
- Tool execution loop
- Cost tracking
"""
MODEL_ALIASES = {
# OpenAI
"gpt4": "gpt-4",
"gpt4o": "gpt-4o",
"gpt4-turbo": "gpt-4-turbo",
"gpt4o-mini": "gpt-4o-mini",
"o1": "o1-preview",
"o1-mini": "o1-mini",
"o3-mini": "o3-mini",
# Google Gemini
"gemini": "gemini/gemini-1.5-pro",
"gemini-pro": "gemini/gemini-1.5-pro",
"gemini-flash": "gemini/gemini-1.5-flash",
"gemini-2": "gemini/gemini-2.0-flash",
# Anthropic (via API, not CLI)
"claude-api": "claude-3-5-sonnet-20241022",
"claude-opus-api": "claude-3-opus-20240229",
# Mistral
"mistral": "mistral/mistral-large-latest",
"mistral-small": "mistral/mistral-small-latest",
"mixtral": "mistral/open-mixtral-8x22b",
"codestral": "mistral/codestral-latest",
# Ollama (local)
"llama3": "ollama/llama3",
"llama3.1": "ollama/llama3.1",
"llama3.2": "ollama/llama3.2",
"codellama": "ollama/codellama",
"mixtral-local": "ollama/mixtral",
"qwen": "ollama/qwen2.5",
"deepseek": "ollama/deepseek-r1",
# Groq (fast inference)
"groq-llama": "groq/llama-3.3-70b-versatile",
"groq-mixtral": "groq/mixtral-8x7b-32768",
# Together
"together-llama": "together_ai/meta-llama/Llama-3-70b-chat-hf",
# DeepSeek
"deepseek-chat": "deepseek/deepseek-chat",
"deepseek-coder": "deepseek/deepseek-coder",
}
# Models that support native function calling
FUNCTION_CALLING_MODELS = {
"gpt-4", "gpt-4o", "gpt-4-turbo", "gpt-4o-mini",
"claude-3", "gemini",
}
def __init__(
self,
model: str = "gpt-4o",
timeout: float = 300.0,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
max_tool_iterations: int = 10,
temperature: float = 0.7,
**kwargs
):
"""
Args:
model: Model name or alias
timeout: Request timeout
api_key: Override API key
api_base: Override API base URL
max_tool_iterations: Max tool use loops
temperature: Sampling temperature
"""
super().__init__(model=model, timeout=timeout, **kwargs)
self.api_key = api_key
self.api_base = api_base
self.max_tool_iterations = max_tool_iterations
self.temperature = temperature
self._litellm = None
def _get_litellm(self):
"""Lazy load litellm."""
if self._litellm is None:
try:
import litellm
litellm.set_verbose = False
self._litellm = litellm
except ImportError:
raise ImportError(
"LiteLLM not installed. Run: pip install litellm"
)
return self._litellm
@property
def name(self) -> str:
return "litellm"
@property
def available_models(self) -> list[str]:
return list(self.MODEL_ALIASES.keys())
@property
def supports_native_tools(self) -> bool:
"""Check if current model supports function calling."""
resolved = self.resolve_model(self.model)
return any(fc in resolved for fc in self.FUNCTION_CALLING_MODELS)
def _build_tools_prompt(self, tools: List[str]) -> str:
"""Build a prompt section describing available tools."""
if not tools:
return ""
prompt = """
## Available Tools
You can use tools by responding with a JSON block like this:
```tool
{
"tool": "tool_name",
"params": {
"param1": "value1"
}
}
```
Available tools:
"""
tool_descriptions = {
"bash": "Execute a bash command. Params: command (required), working_dir (optional)",
"read": "Read a file. Params: path (required), encoding (optional)",
"write": "Write to a file. Params: path (required), content (required), append (optional bool)",
"glob": "Find files by pattern. Params: pattern (required), base_dir (optional)",
"grep": "Search text in files. Params: pattern (required), path (required), recursive (optional bool)",
"http_request": "Make HTTP request. Params: url (required), method (optional), headers (optional), body (optional)",
"ssh": "Execute remote command. Params: host (required), command (required), user (optional), key_path (optional)",
"list_dir": "List directory contents. Params: path (required), recursive (optional bool)",
}
for tool in tools:
if tool in tool_descriptions:
prompt += f"- **{tool}**: {tool_descriptions[tool]}\n"
prompt += "\nAfter using a tool, you'll receive the result and can continue.\n"
return prompt
def _parse_tool_calls(self, text: str) -> List[ToolCall]:
"""Parse tool calls from response text."""
calls = []
# Pattern: ```tool { ... } ```
pattern1 = r'```tool\s*\n?(.*?)\n?```'
for match in re.findall(pattern1, text, re.DOTALL):
try:
data = json.loads(match.strip())
if "tool" in data:
calls.append(ToolCall(
tool=data["tool"],
params=data.get("params", {}),
))
except json.JSONDecodeError:
continue
# Pattern: ```json { "tool": ... } ```
pattern2 = r'```json\s*\n?(.*?)\n?```'
for match in re.findall(pattern2, text, re.DOTALL):
try:
data = json.loads(match.strip())
if "tool" in data:
calls.append(ToolCall(
tool=data["tool"],
params=data.get("params", {}),
))
except json.JSONDecodeError:
continue
return calls
async def _call_llm(
self,
messages: List[dict],
max_tokens: Optional[int] = None,
) -> dict:
"""Make a single LLM call."""
litellm = self._get_litellm()
resolved_model = self.resolve_model(self.model)
kwargs = {
"model": resolved_model,
"messages": messages,
"temperature": self.temperature,
"timeout": self.timeout,
}
if max_tokens:
kwargs["max_tokens"] = max_tokens
if self.api_key:
kwargs["api_key"] = self.api_key
if self.api_base:
kwargs["api_base"] = self.api_base
return await litellm.acompletion(**kwargs)
async def _execute(
self,
prompt: str,
system_prompt: Optional[str] = None,
tools: Optional[List[str]] = None,
tool_executor: Optional[Any] = None,
max_tokens: Optional[int] = None,
**kwargs
) -> ProviderResponse:
"""Execute with optional tool loop."""
tools = tools or []
resolved_model = self.resolve_model(self.model)
# Build system prompt with tools
full_system = system_prompt or ""
if tools:
tools_prompt = self._build_tools_prompt(tools)
full_system = f"{full_system}\n\n{tools_prompt}" if full_system else tools_prompt
# Build messages
messages = []
if full_system:
messages.append({"role": "system", "content": full_system})
messages.append({"role": "user", "content": prompt})
all_tool_results: List[ToolResult] = []
total_usage = UsageInfo()
text = ""
# Tool execution loop
for iteration in range(self.max_tool_iterations):
try:
response = await self._call_llm(messages, max_tokens)
except Exception as e:
return ProviderResponse(
success=False,
text="",
provider=self.name,
model=resolved_model,
error=f"LiteLLM error: {e}",
)
# Extract text
text = response.choices[0].message.content or ""
# Update usage
if hasattr(response, 'usage') and response.usage:
total_usage.input_tokens += getattr(response.usage, 'prompt_tokens', 0)
total_usage.output_tokens += getattr(response.usage, 'completion_tokens', 0)
# Check for tool calls
if tools and tool_executor:
tool_calls = self._parse_tool_calls(text)
if tool_calls:
# Execute tools
results_text_parts = []
for call in tool_calls:
result = await tool_executor.execute(call.tool, call.params)
all_tool_results.append(ToolResult(
tool=call.tool,
success=result.success,
output=result.output,
error=result.error,
execution_time=result.execution_time,
))
status = "✅" if result.success else "❌"
output = result.output if result.success else result.error
results_text_parts.append(
f"**{call.tool}** {status}\n{output}"
)
# Add to conversation and continue
results_text = "\n\n".join(results_text_parts)
messages.append({"role": "assistant", "content": text})
messages.append({"role": "user", "content": f"Tool results:\n\n{results_text}\n\nContinue."})
continue
# No tool calls, we're done
break
return ProviderResponse(
success=True,
text=text,
provider=self.name,
model=resolved_model,
usage=total_usage if total_usage.total_tokens > 0 else None,
tool_results=[
ToolResult(
tool=r.tool,
success=r.success,
output=r.output,
error=r.error,
)
for r in all_tool_results
],
)
```
---
## orchestrator/tools/\_\_init\_\_.py
```python
"""
Tool definitions and executor.
"""
from .definitions import TOOL_DEFINITIONS, get_tool_schema, get_tools_prompt
from .executor import ToolExecutor, ToolResult, SecurityValidator
__all__ = [
"TOOL_DEFINITIONS",
"get_tool_schema",
"get_tools_prompt",
"ToolExecutor",
"ToolResult",
"SecurityValidator",
]
```
---
## orchestrator/tools/definitions.py
```python
"""
Tool definitions in standard format.
Compatible with OpenAI function calling and other providers.
"""
from typing import Any, List, Dict
TOOL_DEFINITIONS: Dict[str, dict] = {
"bash": {
"name": "bash",
"description": "Execute a bash command on the system. Use for running scripts, system commands, git, etc.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute"
},
"working_dir": {
"type": "string",
"description": "Optional working directory"
}
},
"required": ["command"]
}
},
"read": {
"name": "read",
"description": "Read the contents of a file. Returns the text content.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to read"
},
"encoding": {
"type": "string",
"description": "File encoding (default: utf-8)"
}
},
"required": ["path"]
}
},
"write": {
"name": "write",
"description": "Write content to a file. Creates the file if it doesn't exist, overwrites if it does.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file to write"
},
"content": {
"type": "string",
"description": "Content to write to the file"
},
"append": {
"type": "boolean",
"description": "If true, append instead of overwrite"
}
},
"required": ["path", "content"]
}
},
"glob": {
"name": "glob",
"description": "Find files using glob patterns. E.g., '**/*.py' finds all Python files.",
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Glob pattern to search for files"
},
"base_dir": {
"type": "string",
"description": "Base directory for the search"
}
},
"required": ["pattern"]
}
},
"grep": {
"name": "grep",
"description": "Search for text in files using regular expressions.",
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Regex pattern to search for"
},
"path": {
"type": "string",
"description": "File or directory to search in"
},
"recursive": {
"type": "boolean",
"description": "Search recursively in subdirectories"
}
},
"required": ["pattern", "path"]
}
},
"http_request": {
"name": "http_request",
"description": "Make an HTTP request. Useful for REST APIs.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL to request"
},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
"description": "HTTP method"
},
"headers": {
"type": "object",
"description": "Request headers"
},
"body": {
"type": "string",
"description": "Request body (for POST, PUT, PATCH)"
},
"timeout": {
"type": "number",
"description": "Request timeout in seconds"
}
},
"required": ["url"]
}
},
"ssh": {
"name": "ssh",
"description": "Execute a command on a remote server via SSH.",
"parameters": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Server host or IP"
},
"command": {
"type": "string",
"description": "Command to execute on the server"
},
"user": {
"type": "string",
"description": "SSH user (default: root)"
},
"key_path": {
"type": "string",
"description": "Path to SSH key"
},
"port": {
"type": "integer",
"description": "SSH port (default: 22)"
}
},
"required": ["host", "command"]
}
},
"list_dir": {
"name": "list_dir",
"description": "List the contents of a directory.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the directory"
},
"recursive": {
"type": "boolean",
"description": "List recursively"
},
"include_hidden": {
"type": "boolean",
"description": "Include hidden files"
}
},
"required": ["path"]
}
},
}
def get_tool_schema(tool_names: List[str], format: str = "openai") -> List[dict]:
"""
Get tool schemas in the specified format.
Args:
tool_names: List of tool names
format: Output format (openai, anthropic, gemini)
Returns:
List of tool definitions
"""
tools = []
for name in tool_names:
if name not in TOOL_DEFINITIONS:
continue
tool_def = TOOL_DEFINITIONS[name]
if format == "openai":
tools.append({
"type": "function",
"function": {
"name": tool_def["name"],
"description": tool_def["description"],
"parameters": tool_def["parameters"]
}
})
elif format == "anthropic":
tools.append({
"name": tool_def["name"],
"description": tool_def["description"],
"input_schema": tool_def["parameters"]
})
elif format == "gemini":
tools.append({
"function_declarations": [{
"name": tool_def["name"],
"description": tool_def["description"],
"parameters": tool_def["parameters"]
}]
})
else:
tools.append(tool_def)
return tools
def get_tools_prompt(tool_names: List[str]) -> str:
"""
Generate a prompt describing available tools.
For models that don't support native function calling.
"""
if not tool_names:
return ""
prompt = """## Available Tools
You can use the following tools. To use them, respond with a JSON block like this:
```tool
{
"tool": "tool_name",
"params": {
"param1": "value1"
}
}
```
Tools:
"""
for name in tool_names:
if name not in TOOL_DEFINITIONS:
continue
tool = TOOL_DEFINITIONS[name]
props = tool["parameters"]["properties"]
required = tool["parameters"].get("required", [])
prompt += f"### {tool['name']}\n"
prompt += f"{tool['description']}\n"
prompt += "Parameters:\n"
for prop_name, prop_def in props.items():
req_marker = "(required)" if prop_name in required else "(optional)"
prompt += f" - {prop_name} {req_marker}: {prop_def.get('description', '')}\n"
prompt += "\n"
prompt += """
After using a tool, you'll receive the result and can continue.
You can use multiple tools in sequence for complex tasks.
"""
return prompt
```
---
## orchestrator/tools/executor.py
```python
"""
Secure tool executor.
Features:
- Path sandboxing
- Command validation
- Rate limiting
- Retry with backoff
- Security logging
"""
import asyncio
import json
import re
import shlex
import time
import urllib.request
import urllib.error
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Callable, Dict, Any, List
from datetime import datetime
from ..core import RateLimiter, retry_with_backoff, RetryConfig
@dataclass
class ToolResult:
"""Result of tool execution."""
tool: str
success: bool
output: str
error: Optional[str] = None
execution_time: float = 0.0
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
retries: int = 0
class SecurityValidator:
"""Security validator for tool operations."""
# Commands that should never be executed
DANGEROUS_COMMANDS = {
"rm -rf /", "rm -rf /*", ":(){ :|:& };:",
"dd if=", "mkfs", "fdisk", "> /dev/sd",
"chmod -R 777 /", "chown -R", "curl | sh",
"wget | sh", "curl | bash", "wget | bash",
}
# Dangerous patterns
DANGEROUS_PATTERNS = [
r"rm\s+-rf\s+/(?!\w)", # rm -rf / (but not /home)
r">\s*/dev/sd[a-z]", # Write to devices
r"mkfs\.", # Format disks
r"dd\s+if=.+of=/dev", # dd to devices
r"\|\s*(?:ba)?sh", # Piping to shell
r";\s*rm\s+-rf", # Command injection with rm
]
def __init__(
self,
working_dir: Path,
sandbox: bool = True,
allowed_commands: Optional[List[str]] = None
):
self.working_dir = working_dir.resolve()
self.sandbox = sandbox
self.allowed_commands = allowed_commands
def validate_path(self, path: str) -> tuple[bool, str]:
"""
Validate that a path is within the sandbox.
Returns:
(is_valid, resolved_path_or_error)
"""
if not self.sandbox:
return True, str(Path(path).expanduser())
try:
resolved = Path(path).expanduser()
if not resolved.is_absolute():
resolved = self.working_dir / resolved
resolved = resolved.resolve()
# Check if within working_dir
try:
resolved.relative_to(self.working_dir)
return True, str(resolved)
except ValueError:
return False, f"Path outside sandbox: {path}"
except Exception as e:
return False, f"Invalid path: {e}"
def validate_command(self, command: str) -> tuple[bool, str]:
"""
Validate that a command is safe.
Returns:
(is_safe, error_if_unsafe)
"""
# Check exact dangerous commands
for dangerous in self.DANGEROUS_COMMANDS:
if dangerous in command:
return False, f"Dangerous command: {dangerous}"
# Check dangerous patterns
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return False, f"Dangerous pattern: {pattern}"
# Check whitelist if provided
if self.allowed_commands:
cmd_name = command.split()[0] if command.split() else ""
if cmd_name not in self.allowed_commands:
return False, f"Command not allowed: {cmd_name}"
return True, ""
def sanitize_for_shell(self, value: str) -> str:
"""Escape a value for safe shell use."""
return shlex.quote(value)
class ToolExecutor:
"""
Executes tools securely.
Features:
- Path sandboxing
- Command validation
- Rate limiting
- Automatic retry
"""
def __init__(
self,
working_dir: Optional[str] = None,
timeout: float = 60.0,
sandbox_paths: bool = True,
allowed_commands: Optional[List[str]] = None,
ssh_key_path: str = "~/.ssh/id_rsa",
ssh_strict_host_checking: bool = True,
rate_limit_per_minute: int = 60,
max_retries: int = 3,
retry_delay: float = 1.0,
):
self.working_dir = Path(working_dir).resolve() if working_dir else Path.cwd()
self.timeout = timeout
self.ssh_key_path = Path(ssh_key_path).expanduser()
self.ssh_strict_host_checking = ssh_strict_host_checking
self.max_retries = max_retries
# Security
self.validator = SecurityValidator(
self.working_dir,
sandbox=sandbox_paths,
allowed_commands=allowed_commands,
)
# Rate limiting
self.rate_limiter = RateLimiter(rate_limit_per_minute)
# Retry config
self.retry_config = RetryConfig(
max_retries=max_retries,
base_delay=retry_delay,
)
# Tool registry
self._tools: Dict[str, Callable] = {
"bash": self._exec_bash,
"read": self._exec_read,
"write": self._exec_write,
"glob": self._exec_glob,
"grep": self._exec_grep,
"http_request": self._exec_http,
"ssh": self._exec_ssh,
"list_dir": self._exec_list_dir,
}
@property
def available_tools(self) -> List[str]:
"""List of available tool names."""
return list(self._tools.keys())
async def execute(self, tool: str, params: dict) -> ToolResult:
"""
Execute a tool with rate limiting and retry.
"""
start_time = time.time()
if tool not in self._tools:
return ToolResult(
tool=tool,
success=False,
output="",
error=f"Unknown tool: {tool}",
)
# Rate limiting
await self.rate_limiter.acquire()
# Execute with retry
async def do_execute():
return await self._tools[tool](params)
try:
result, retries = await retry_with_backoff(
do_execute,
config=self.retry_config,
)
result.retries = retries
result.execution_time = time.time() - start_time
return result
except Exception as e:
return ToolResult(
tool=tool,
success=False,
output="",
error=str(e),
execution_time=time.time() - start_time,
)
def parse_tool_calls(self, text: str) -> List[dict]:
"""
Parse tool calls from text.
Supports formats:
- ```tool { ... } ```
- ```json { "tool": ... } ```
"""
calls = []
# Format: ```tool ... ```
pattern1 = r'```tool\s*\n?(.*?)\n?```'
for match in re.findall(pattern1, text, re.DOTALL):
try:
data = json.loads(match.strip())
if "tool" in data:
calls.append(data)
except json.JSONDecodeError:
continue
# Format: ```json ... ``` with tool
pattern2 = r'```json\s*\n?(.*?)\n?```'
for match in re.findall(pattern2, text, re.DOTALL):
try:
data = json.loads(match.strip())
if "tool" in data:
calls.append(data)
except json.JSONDecodeError:
continue
return calls
async def execute_from_text(self, text: str) -> List[ToolResult]:
"""Execute all tools found in text."""
calls = self.parse_tool_calls(text)
results = []
for call in calls:
tool = call.get("tool")
params = call.get("params", {})
result = await self.execute(tool, params)
results.append(result)
return results
# ==================== TOOL IMPLEMENTATIONS ====================
async def _exec_bash(self, params: dict) -> ToolResult:
"""Execute a bash command securely."""
command = params.get("command", "")
working_dir = params.get("working_dir")
if not command:
return ToolResult(tool="bash", success=False, output="", error="Empty command")
# Validate command
is_safe, error = self.validator.validate_command(command)
if not is_safe:
return ToolResult(tool="bash", success=False, output="", error=error)
# Determine working directory
cwd = self.working_dir
if working_dir:
is_valid, result = self.validator.validate_path(working_dir)
if is_valid:
cwd = Path(result)
try:
process = await asyncio.create_subprocess_exec(
"/bin/bash", "-c", command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(cwd),
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout,
)
return ToolResult(
tool="bash",
success=process.returncode == 0,
output=stdout.decode("utf-8", errors="replace"),
error=stderr.decode("utf-8", errors="replace") if process.returncode != 0 else None,
)
except asyncio.TimeoutError:
return ToolResult(
tool="bash",
success=False,
output="",
error=f"Timeout after {self.timeout}s",
)
async def _exec_read(self, params: dict) -> ToolResult:
"""Read a file within the sandbox."""
path = params.get("path", "")
encoding = params.get("encoding", "utf-8")
if not path:
return ToolResult(tool="read", success=False, output="", error="Empty path")
is_valid, result = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="read", success=False, output="", error=result)
try:
content = Path(result).read_text(encoding=encoding)
return ToolResult(tool="read", success=True, output=content)
except FileNotFoundError:
return ToolResult(tool="read", success=False, output="", error=f"File not found: {path}")
except Exception as e:
return ToolResult(tool="read", success=False, output="", error=str(e))
async def _exec_write(self, params: dict) -> ToolResult:
"""Write a file within the sandbox."""
path = params.get("path", "")
content = params.get("content", "")
append = params.get("append", False)
if not path:
return ToolResult(tool="write", success=False, output="", error="Empty path")
is_valid, result = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="write", success=False, output="", error=result)
try:
file_path = Path(result)
file_path.parent.mkdir(parents=True, exist_ok=True)
mode = "a" if append else "w"
with open(file_path, mode) as f:
f.write(content)
return ToolResult(
tool="write",
success=True,
output=f"Wrote {len(content)} bytes to {file_path.name}",
)
except Exception as e:
return ToolResult(tool="write", success=False, output="", error=str(e))
async def _exec_glob(self, params: dict) -> ToolResult:
"""Find files by glob pattern."""
pattern = params.get("pattern", "")
base_dir = params.get("base_dir")
if not pattern:
return ToolResult(tool="glob", success=False, output="", error="Empty pattern")
search_dir = self.working_dir
if base_dir:
is_valid, result = self.validator.validate_path(base_dir)
if is_valid:
search_dir = Path(result)
try:
files = list(search_dir.glob(pattern))
safe_files = []
for f in files[:100]: # Limit results
try:
f.relative_to(self.working_dir)
safe_files.append(str(f.relative_to(self.working_dir)))
except ValueError:
continue
output = "\n".join(sorted(safe_files))
return ToolResult(
tool="glob",
success=True,
output=f"Found {len(safe_files)} files:\n{output}",
)
except Exception as e:
return ToolResult(tool="glob", success=False, output="", error=str(e))
async def _exec_grep(self, params: dict) -> ToolResult:
"""Search text in files."""
pattern = params.get("pattern", "")
path = params.get("path", "")
recursive = params.get("recursive", False)
if not pattern or not path:
return ToolResult(tool="grep", success=False, output="", error="Pattern or path empty")
is_valid, validated_path = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="grep", success=False, output="", error=validated_path)
try:
cmd = ["grep", "-n", "--color=never"]
if recursive:
cmd.append("-r")
cmd.extend([pattern, validated_path])
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.working_dir),
)
stdout, _ = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout,
)
return ToolResult(tool="grep", success=True, output=stdout.decode())
except Exception as e:
return ToolResult(tool="grep", success=False, output="", error=str(e))
async def _exec_http(self, params: dict) -> ToolResult:
"""Make an HTTP request."""
url = params.get("url", "")
method = params.get("method", "GET").upper()
headers = params.get("headers", {})
body = params.get("body", "")
timeout = params.get("timeout", self.timeout)
if not url:
return ToolResult(tool="http_request", success=False, output="", error="Empty URL")
try:
data = body.encode() if body else None
req = urllib.request.Request(url, data=data, method=method)
for key, value in headers.items():
req.add_header(key, value)
with urllib.request.urlopen(req, timeout=timeout) as response:
content = response.read().decode()
return ToolResult(tool="http_request", success=True, output=content)
except urllib.error.HTTPError as e:
return ToolResult(
tool="http_request",
success=False,
output=e.read().decode() if e.fp else "",
error=f"HTTP {e.code}: {e.reason}",
)
except Exception as e:
return ToolResult(tool="http_request", success=False, output="", error=str(e))
async def _exec_ssh(self, params: dict) -> ToolResult:
"""Execute command via SSH."""
host = params.get("host", "")
command = params.get("command", "")
user = params.get("user", "root")
key_path = params.get("key_path", str(self.ssh_key_path))
port = params.get("port", 22)
if not host or not command:
return ToolResult(tool="ssh", success=False, output="", error="Host or command empty")
# Validate remote command too
is_safe, error = self.validator.validate_command(command)
if not is_safe:
return ToolResult(tool="ssh", success=False, output="", error=f"Unsafe remote command: {error}")
try:
ssh_cmd = [
"ssh",
"-o", f"StrictHostKeyChecking={'yes' if self.ssh_strict_host_checking else 'no'}",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=10",
"-i", key_path,
"-p", str(port),
f"{user}@{host}",
command,
]
process = await asyncio.create_subprocess_exec(
*ssh_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout,
)
return ToolResult(
tool="ssh",
success=process.returncode == 0,
output=stdout.decode(),
error=stderr.decode() if process.returncode != 0 else None,
)
except asyncio.TimeoutError:
return ToolResult(
tool="ssh",
success=False,
output="",
error=f"SSH timeout after {self.timeout}s",
)
except Exception as e:
return ToolResult(tool="ssh", success=False, output="", error=str(e))
async def _exec_list_dir(self, params: dict) -> ToolResult:
"""List directory contents."""
path = params.get("path", ".")
recursive = params.get("recursive", False)
include_hidden = params.get("include_hidden", False)
is_valid, validated_path = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="list_dir", success=False, output="", error=validated_path)
try:
dir_path = Path(validated_path)
if not dir_path.exists():
return ToolResult(
tool="list_dir",
success=False,
output="",
error=f"Directory not found: {path}",
)
entries = []
pattern = "**/*" if recursive else "*"
for entry in dir_path.glob(pattern):
if not include_hidden and entry.name.startswith("."):
continue
try:
entry.relative_to(self.working_dir)
except ValueError:
continue
entry_type = "d" if entry.is_dir() else "f"
rel_path = entry.relative_to(dir_path)
entries.append(f"[{entry_type}] {rel_path}")
return ToolResult(
tool="list_dir",
success=True,
output="\n".join(sorted(entries)[:200]),
)
except Exception as e:
return ToolResult(tool="list_dir", success=False, output="", error=str(e))
```
---
## orchestrator/agents/\_\_init\_\_.py
```python
"""
Agent module.
"""
from .base import Agent, AgentResult
__all__ = ["Agent", "AgentResult"]
```
---
## orchestrator/agents/base.py
```python
"""
Agent implementation.
An Agent combines:
- A provider (Claude, LiteLLM, etc.)
- Tools
- A role/system prompt
- Logging
"""
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field
from ..config import get_config, AgentConfig
from ..providers import ClaudeProvider, LiteLLMProvider, ProviderResponse
from ..tools import ToolExecutor
@dataclass
class AgentResult:
"""Result of an agent execution."""
success: bool
output: str
agent_name: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
error: Optional[str] = None
tool_results: List[Dict] = field(default_factory=list)
usage: Optional[Dict] = None
retries: int = 0
class Agent:
"""
Configurable AI agent.
Combines a provider, tools, and a role to create
a specialized AI assistant.
"""
def __init__(
self,
name: str = "",
role: str = "",
provider: str = "claude",
model: str = "sonnet",
tools: Optional[List[str]] = None,
servers: Optional[List[str]] = None,
max_turns: int = 10,
config_obj: Optional[AgentConfig] = None,
):
"""
Initialize agent from parameters or config object.
"""
if config_obj:
self.name = config_obj.name
self.role = config_obj.role
self.provider_name = config_obj.provider
self.model = config_obj.model
self.tools = config_obj.tools or []
self.servers = config_obj.servers or []
self.max_turns = config_obj.max_turns
else:
self.name = name
self.role = role
self.provider_name = provider
self.model = model
self.tools = tools or []
self.servers = servers or []
self.max_turns = max_turns
self.config = get_config()
# Paths
self.log_file = self.config.logs_dir / f"{self.name}.md"
self.output_dir = self.config.outputs_dir / self.name
# Create provider
self.provider = self._create_provider()
# Create tool executor for non-native tools
self.tool_executor = self._create_tool_executor()
# Build system prompt
self.system_prompt = self._build_system_prompt()
# Initialize
self.output_dir.mkdir(parents=True, exist_ok=True)
if not self.log_file.exists():
self._init_log()
def _create_provider(self):
"""Create the appropriate provider."""
settings = self.config.settings
common_kwargs = {
"model": self.model,
"timeout": settings.timeout,
"max_retries": settings.max_retries,
"rate_limit_per_minute": settings.rate_limit_per_minute,
"enable_circuit_breaker": settings.enable_circuit_breaker,
"circuit_breaker_threshold": settings.circuit_breaker_threshold,
"circuit_breaker_timeout": settings.circuit_breaker_timeout,
}
if self.provider_name == "claude":
return ClaudeProvider(
max_turns=self.max_turns,
working_directory=str(self.config.base_dir),
**common_kwargs,
)
else:
return LiteLLMProvider(
max_tool_iterations=self.max_turns,
**common_kwargs,
)
def _create_tool_executor(self) -> ToolExecutor:
"""Create tool executor for non-native tools."""
settings = self.config.settings
return ToolExecutor(
working_dir=str(self.config.base_dir),
timeout=settings.timeout,
sandbox_paths=settings.sandbox_paths,
allowed_commands=settings.allowed_commands or None,
ssh_strict_host_checking=settings.ssh_strict_host_checking,
rate_limit_per_minute=settings.rate_limit_per_minute,
max_retries=settings.max_retries,
)
def _build_system_prompt(self) -> str:
"""Build the system prompt including role and server info."""
prompt = f"# {self.name}\n\n{self.role}"
if self.servers:
prompt += "\n\n## Available Servers\n"
for server_name in self.servers:
server = self.config.get_server(server_name)
if server:
prompt += f"- **{server_name}**: {server.user}@{server.host}"
if server.description:
prompt += f" ({server.description})"
prompt += "\n"
return prompt
def _init_log(self):
"""Initialize the log file."""
header = f"""# {self.name}
- **Created:** {datetime.now().isoformat()}
- **Provider:** {self.provider_name}
- **Model:** {self.model}
- **Tools:** {', '.join(self.tools) if self.tools else 'none'}
---
"""
self.log_file.write_text(header)
def log(self, entry_type: str, content: str):
"""Add entry to log file."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"\n## {timestamp} | {entry_type}\n\n{content}\n"
with open(self.log_file, "a") as f:
f.write(entry)
def _get_non_native_tools(self) -> List[str]:
"""Get tools that need manual execution (not native to provider)."""
if self.provider_name == "claude":
# Claude CLI handles: bash, read, write, edit, glob, grep, webfetch
native = {"bash", "read", "write", "edit", "glob", "grep", "webfetch", "web_fetch"}
return [t for t in self.tools if t.lower() not in native]
else:
# LiteLLM doesn't have native tools
return self.tools
async def run(self, prompt: str, log_action: bool = True) -> AgentResult:
"""
Execute a prompt through this agent.
"""
if log_action:
self.log("PROMPT", prompt[:500] + "..." if len(prompt) > 500 else prompt)
try:
# Determine which tools need manual handling
non_native_tools = self._get_non_native_tools()
# For Claude provider, pass all tools (it handles native ones)
# For LiteLLM, we need to handle tools ourselves
if self.provider_name == "claude":
response = await self.provider.run_with_tools(
prompt=prompt,
tools=self.tools,
system_prompt=self.system_prompt,
)
else:
# LiteLLM with tool execution
response = await self.provider.run_with_tools(
prompt=prompt,
tools=self.tools,
system_prompt=self.system_prompt,
tool_executor=self.tool_executor,
)
# Build result
result = AgentResult(
success=response.success,
output=response.text,
agent_name=self.name,
error=response.error,
tool_results=[
{
"tool": tr.tool,
"success": tr.success,
"output": tr.output[:200] if tr.output else "",
"error": tr.error,
}
for tr in response.tool_results
],
usage=response.usage.to_dict() if response.usage else None,
retries=response.retries,
)
if log_action:
status = "RESULT" if response.success else "ERROR"
content = response.text[:500] if response.success else (response.error or "Unknown error")
self.log(status, content)
return result
except Exception as e:
if log_action:
self.log("ERROR", str(e))
return AgentResult(
success=False,
output="",
agent_name=self.name,
error=str(e),
)
async def run_with_context(
self,
prompt: str,
context: str,
log_action: bool = True,
) -> AgentResult:
"""
Execute a prompt with additional context.
"""
full_prompt = f"{context}\n\n---\n\n{prompt}"
return await self.run(full_prompt, log_action)
def save_output(self, filename: str, content: str) -> Path:
"""Save content to the agent's output directory."""
filepath = self.output_dir / filename
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(content)
self.log("FILE", f"Created: {filepath}")
return filepath
def read_log(self, last_n: Optional[int] = None) -> str:
"""Read the agent's log file."""
if not self.log_file.exists():
return ""
content = self.log_file.read_text()
if last_n:
entries = content.split("\n## ")
return "\n## ".join(entries[-last_n:])
return content
async def health_check(self) -> bool:
"""Check if the agent's provider is healthy."""
return await self.provider.health_check()
def __repr__(self) -> str:
return f"Agent({self.name}, {self.provider_name}/{self.model})"
```
---
## orchestrator/tracking/\_\_init\_\_.py
```python
"""
Cost tracking module.
Track token usage and costs across providers.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
import json
# Cost per million tokens (approximate, may vary)
COST_PER_MILLION = {
# Claude models
"claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
"claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
"claude-3-5-haiku-latest": {"input": 0.25, "output": 1.25},
# OpenAI models
"gpt-4o": {"input": 2.5, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.6},
"gpt-4-turbo": {"input": 10.0, "output": 30.0},
"o1-preview": {"input": 15.0, "output": 60.0},
"o1-mini": {"input": 3.0, "output": 12.0},
# Google models
"gemini/gemini-1.5-pro": {"input": 1.25, "output": 5.0},
"gemini/gemini-1.5-flash": {"input": 0.075, "output": 0.3},
# Default for unknown models
"default": {"input": 1.0, "output": 3.0},
}
@dataclass
class RequestCost:
"""Cost information for a single request."""
model: str
input_tokens: int
output_tokens: int
cache_read_tokens: int = 0
cache_creation_tokens: int = 0
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
@property
def total_tokens(self) -> int:
return self.input_tokens + self.output_tokens
@property
def input_cost(self) -> float:
"""Calculate input cost in USD."""
rates = COST_PER_MILLION.get(self.model, COST_PER_MILLION["default"])
# Cache reads are typically 90% cheaper
regular_input = self.input_tokens - self.cache_read_tokens
cache_cost = (self.cache_read_tokens / 1_000_000) * rates["input"] * 0.1
regular_cost = (regular_input / 1_000_000) * rates["input"]
return regular_cost + cache_cost
@property
def output_cost(self) -> float:
"""Calculate output cost in USD."""
rates = COST_PER_MILLION.get(self.model, COST_PER_MILLION["default"])
return (self.output_tokens / 1_000_000) * rates["output"]
@property
def total_cost(self) -> float:
"""Total cost in USD."""
return self.input_cost + self.output_cost
@dataclass
class CostSummary:
"""Summary of costs over multiple requests."""
total_requests: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
total_cache_read_tokens: int = 0
total_cost_usd: float = 0.0
by_model: Dict[str, Dict] = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"total_requests": self.total_requests,
"total_input_tokens": self.total_input_tokens,
"total_output_tokens": self.total_output_tokens,
"total_tokens": self.total_input_tokens + self.total_output_tokens,
"total_cache_read_tokens": self.total_cache_read_tokens,
"total_cost_usd": round(self.total_cost_usd, 6),
"by_model": self.by_model,
}
class CostTracker:
"""
Track costs across multiple requests.
Usage:
tracker = CostTracker(budget_limit=10.0)
# Record usage
tracker.record(model="claude-sonnet-4-20250514", input_tokens=1000, output_tokens=500)
# Check status
print(tracker.summary)
print(tracker.remaining_budget)
"""
def __init__(
self,
budget_limit: Optional[float] = None,
on_budget_warning: Optional[callable] = None,
warning_threshold: float = 0.8,
):
"""
Args:
budget_limit: Optional budget limit in USD
on_budget_warning: Callback when approaching budget
warning_threshold: Fraction of budget to trigger warning (0.8 = 80%)
"""
self.budget_limit = budget_limit
self.on_budget_warning = on_budget_warning
self.warning_threshold = warning_threshold
self._requests: List[RequestCost] = []
self._warning_sent = False
def record(
self,
model: str,
input_tokens: int,
output_tokens: int,
cache_read_tokens: int = 0,
cache_creation_tokens: int = 0,
) -> RequestCost:
"""Record a request's token usage."""
cost = RequestCost(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cache_read_tokens=cache_read_tokens,
cache_creation_tokens=cache_creation_tokens,
)
self._requests.append(cost)
# Check budget
if self.budget_limit and not self._warning_sent:
if self.total_cost >= self.budget_limit * self.warning_threshold:
self._warning_sent = True
if self.on_budget_warning:
self.on_budget_warning(self.total_cost, self.budget_limit)
return cost
def record_from_response(self, response) -> Optional[RequestCost]:
"""Record from a ProviderResponse object."""
if not response.usage:
return None
return self.record(
model=response.model,
input_tokens=response.usage.input_tokens,
output_tokens=response.usage.output_tokens,
cache_read_tokens=getattr(response.usage, 'cache_read_tokens', 0),
cache_creation_tokens=getattr(response.usage, 'cache_creation_tokens', 0),
)
@property
def total_cost(self) -> float:
"""Total cost across all requests."""
return sum(r.total_cost for r in self._requests)
@property
def total_tokens(self) -> int:
"""Total tokens across all requests."""
return sum(r.total_tokens for r in self._requests)
@property
def remaining_budget(self) -> Optional[float]:
"""Remaining budget in USD, or None if no limit."""
if self.budget_limit is None:
return None
return max(0, self.budget_limit - self.total_cost)
@property
def is_over_budget(self) -> bool:
"""Check if over budget."""
if self.budget_limit is None:
return False
return self.total_cost >= self.budget_limit
@property
def summary(self) -> CostSummary:
"""Get cost summary."""
summary = CostSummary()
for request in self._requests:
summary.total_requests += 1
summary.total_input_tokens += request.input_tokens
summary.total_output_tokens += request.output_tokens
summary.total_cache_read_tokens += request.cache_read_tokens
summary.total_cost_usd += request.total_cost
# By model
if request.model not in summary.by_model:
summary.by_model[request.model] = {
"requests": 0,
"input_tokens": 0,
"output_tokens": 0,
"cost_usd": 0.0,
}
summary.by_model[request.model]["requests"] += 1
summary.by_model[request.model]["input_tokens"] += request.input_tokens
summary.by_model[request.model]["output_tokens"] += request.output_tokens
summary.by_model[request.model]["cost_usd"] += request.total_cost
return summary
def reset(self):
"""Reset all tracked costs."""
self._requests = []
self._warning_sent = False
def export(self) -> str:
"""Export cost data as JSON."""
return json.dumps({
"requests": [
{
"model": r.model,
"input_tokens": r.input_tokens,
"output_tokens": r.output_tokens,
"cache_read_tokens": r.cache_read_tokens,
"total_cost": r.total_cost,
"timestamp": r.timestamp,
}
for r in self._requests
],
"summary": self.summary.to_dict(),
"budget_limit": self.budget_limit,
}, indent=2)
```
---
## Ejemplos de Configuración
### examples/simple.yaml
```yaml
# Simple single agent configuration
settings:
default_provider: claude
default_model: sonnet
agents:
assistant:
role: "You are a helpful assistant."
provider: claude
model: sonnet
tools:
- bash
- read
- write
```
### examples/dev_team.yaml
```yaml
# Software development team
settings:
default_provider: claude
default_model: sonnet
timeout: 300
sandbox_paths: true
agents:
architect:
role: |
You are a senior software architect.
You design scalable and maintainable systems.
You make important technical decisions.
You document decisions in ADRs (Architecture Decision Records).
provider: claude
model: opus
tools:
- read
- write
- list_dir
- glob
developer:
role: |
You are an experienced full-stack developer.
You write clean, well-documented, and testable code.
You follow language best practices.
You always include appropriate error handling.
provider: claude
model: sonnet
tools:
- read
- write
- bash
- grep
- glob
reviewer:
role: |
You are a demanding but constructive code reviewer.
You look for bugs, security issues, and improvements.
You suggest refactoring when needed.
You validate that code follows standards.
provider: litellm
model: gpt4o
tools:
- read
- grep
- glob
tester:
role: |
You are a QA engineer specialized in testing.
You write unit, integration, and e2e tests.
You identify edge cases and error scenarios.
You ensure good test coverage.
provider: litellm
model: gemini-pro
tools:
- read
- write
- bash
```
### examples/devops.yaml
```yaml
# DevOps team with server access
settings:
default_provider: claude
default_model: sonnet
timeout: 300
ssh_strict_host_checking: true
servers:
production:
host: prod.example.com
user: deploy
key: ~/.ssh/deploy_key
description: "Production server"
staging:
host: staging.example.com
user: deploy
key: ~/.ssh/deploy_key
description: "Staging server"
agents:
deployer:
role: |
You are a deployment specialist.
You deploy applications safely to production.
You always verify deployments succeed.
You rollback immediately if issues are detected.
provider: claude
model: sonnet
tools:
- ssh
- bash
- read
servers:
- production
- staging
monitor:
role: |
You are a systems monitoring expert.
You check server health and metrics.
You identify issues before they become problems.
provider: claude
model: haiku
tools:
- ssh
- http_request
- read
servers:
- production
- staging
```
### examples/local_ollama.yaml
```yaml
# Using local Ollama models (no API costs!)
#
# Requirements:
# 1. Install Ollama: https://ollama.ai
# 2. Pull models: ollama pull llama3
# 3. pip install litellm
settings:
default_provider: litellm
default_model: llama3
timeout: 120
max_tool_iterations: 15
agents:
assistant:
role: |
You are a helpful assistant running locally.
You can execute commands and work with files.
provider: litellm
model: llama3
tools:
- bash
- read
- write
- list_dir
coder:
role: |
You are a coding assistant specialized in programming.
You write clean, efficient code.
provider: litellm
model: codellama
tools:
- read
- write
- bash
- grep
```
---
## .env.example
```bash
# .env.example - Copy to .env and fill in your values
# API Keys
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=...
ANTHROPIC_API_KEY=...
MISTRAL_API_KEY=...
GROQ_API_KEY=...
# SSH Configuration
SSH_KEY_PATH=~/.ssh/id_rsa
SSH_KNOWN_HOSTS_CHECK=true
```
---
## .gitignore
```
# Environment
.env
.venv/
venv/
# Python
__pycache__/
*.py[cod]
*.egg-info/
# IDE
.idea/
.vscode/
# Logs and outputs
logs/*.md
!logs/.gitkeep
outputs/**
!outputs/.gitkeep
# Secrets
*.pem
*.key
```
---
## Uso Rápido
```bash
# 1. Crear estructura de directorios
mkdir -p tzzr_v6/orchestrator/{core,providers,tools,agents,tracking}
mkdir -p tzzr_v6/{logs,outputs,examples}
# 2. Copiar cada archivo del código de arriba a su ubicación correspondiente
# 3. Instalar dependencias
pip install pyyaml
pip install litellm # opcional, para GPT-4/Gemini/etc
# 4. Ejecutar
cd tzzr_v6
python main.py
```