Initial commit: TZZR Orchestrator v5

- Framework genérico multi-agente
- Providers: Claude CLI, LiteLLM (100+ modelos)
- Tools: bash, read, write, glob, grep, ssh, http
- Seguridad: sandbox paths, validación comandos, rate limiting
- Configuración via YAML + .env

🤖 Generated with Claude Code
This commit is contained in:
tzzrgit
2025-12-23 16:22:49 +01:00
commit 374a74aa6d
23 changed files with 2814 additions and 0 deletions

35
.gitignore vendored Normal file
View File

@@ -0,0 +1,35 @@
# .gitignore
# Credenciales - NUNCA subir
.env
*.env
.env.*
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.venv/
venv/
env/
# Logs y outputs
logs/*.md
!logs/.gitkeep
outputs/*
!outputs/.gitkeep
# IDE
.idea/
.vscode/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Claude Code
.claude/

200
README.md Normal file
View File

@@ -0,0 +1,200 @@
# LLM Orchestrator
Sistema de orquestación multi-agente compatible con cualquier LLM.
## ¿Qué es esto?
Un framework para crear y coordinar múltiples agentes de IA que pueden:
- Ejecutar comandos en tu sistema
- Leer y escribir archivos
- Conectarse a servidores via SSH
- Hacer llamadas a APIs
- Trabajar juntos en tareas complejas
## Características
- **Multi-modelo**: Claude, GPT-4, Gemini, Llama, Mistral, y 100+ más
- **Herramientas universales**: Bash, lectura/escritura de archivos, SSH, HTTP
- **Agentes personalizables**: Define tantos agentes como necesites
- **LOGs automáticos**: Registro de todas las acciones
- **Sin dependencias pesadas**: Solo Python estándar + LiteLLM opcional
## Instalación
```bash
# Clonar o descomprimir
cd orchestrator
# Crear entorno virtual
python3 -m venv .venv
source .venv/bin/activate
# Instalar dependencias opcionales
pip install litellm # Para usar GPT-4, Gemini, Llama, etc.
```
## Uso rápido
### 1. Define tus agentes
Edita `config.yaml`:
```yaml
agents:
researcher:
role: "Investigador que busca información"
provider: claude
model: sonnet
tools: [bash, read, http_request]
coder:
role: "Programador que escribe código"
provider: litellm
model: gpt4o
tools: [read, write, bash]
reviewer:
role: "Revisor que valida el trabajo"
provider: litellm
model: gemini-pro
tools: [read, grep]
```
### 2. Ejecuta el orquestador
```bash
# Modo interactivo
python orchestrator/main.py
# Ejecutar un agente específico
python orchestrator/main.py --agent researcher --prompt "Busca información sobre X"
# Ver estado
python orchestrator/main.py --status
```
### 3. Comandos interactivos
```
/status - Ver estado del sistema
/agents - Listar agentes disponibles
/agent <nombre> - Cambiar agente activo
/logs <agente> - Ver historial del agente
/all - Ejecutar en todos los agentes
/quit - Salir
```
## Estructura
```
orchestrator/
├── config.yaml # ← Tu configuración de agentes
├── orchestrator/
│ ├── main.py # Punto de entrada
│ ├── config.py # Carga de configuración
│ ├── providers/ # Conexión con LLMs
│ │ ├── claude_provider.py
│ │ └── litellm_provider.py
│ ├── tools/ # Herramientas disponibles
│ │ ├── executor.py
│ │ └── definitions.py
│ ├── agents/ # Lógica de agentes
│ │ └── base.py
│ └── tasks/ # Tareas predefinidas
├── logs/ # Historial por agente
├── outputs/ # Archivos generados
└── examples/ # Ejemplos de configuración
```
## Providers disponibles
| Provider | Modelos | Requisito |
|----------|---------|-----------|
| `claude` | sonnet, opus, haiku | Claude Code CLI instalado |
| `litellm` | gpt4o, gemini-pro, llama3, mistral... | `pip install litellm` + API keys |
## Herramientas disponibles
| Herramienta | Descripción |
|-------------|-------------|
| `bash` | Ejecuta comandos del sistema |
| `read` | Lee archivos |
| `write` | Escribe/crea archivos |
| `glob` | Busca archivos por patrón |
| `grep` | Busca texto en archivos |
| `ssh` | Ejecuta comandos en servidores remotos |
| `http_request` | Hace peticiones HTTP/API |
| `list_dir` | Lista contenido de directorios |
## Ejemplos
### Agente simple (solo conversación)
```yaml
agents:
assistant:
role: "Asistente general"
provider: claude
model: sonnet
tools: [] # Sin herramientas
```
### Equipo de desarrollo
```yaml
agents:
architect:
role: "Diseña la arquitectura del sistema"
provider: claude
model: opus
tools: [read, write, bash]
developer:
role: "Implementa el código"
provider: litellm
model: gpt4o
tools: [read, write, bash, grep]
tester:
role: "Escribe y ejecuta tests"
provider: litellm
model: gemini-pro
tools: [read, bash]
```
### Agentes con servidores
```yaml
servers:
production:
host: 192.168.1.100
user: deploy
key: ~/.ssh/id_rsa
staging:
host: 192.168.1.101
user: deploy
key: ~/.ssh/id_rsa
agents:
deployer:
role: "Despliega aplicaciones a servidores"
provider: claude
model: sonnet
tools: [ssh, bash, read]
servers: [production, staging]
```
## Variables de entorno
Para usar modelos de pago via LiteLLM:
```bash
export OPENAI_API_KEY="sk-..."
export GOOGLE_API_KEY="..."
export ANTHROPIC_API_KEY="..." # Si usas Claude via API
```
## Licencia
MIT - Usa, modifica y comparte libremente.

122
config.yaml Normal file
View File

@@ -0,0 +1,122 @@
# config.yaml - Configuración del orquestador
#
# Edita este archivo para definir tus agentes y servidores.
# Puedes tener tantos agentes como necesites.
# ============================================================================
# CONFIGURACIÓN GENERAL
# ============================================================================
settings:
# Modelo por defecto si no se especifica en el agente
default_provider: claude
default_model: sonnet
# Timeout en segundos para las llamadas
timeout: 300
# Directorio de trabajo (relativo a este archivo)
working_dir: .
# Máximo de iteraciones de herramientas por turno
max_tool_iterations: 10
# ============================================================================
# SERVIDORES (opcional)
# ============================================================================
# Define servidores para que los agentes puedan conectarse via SSH
servers:
# Ejemplo:
# production:
# host: 192.168.1.100
# user: root
# key: ~/.ssh/id_rsa
# description: "Servidor de producción"
# ============================================================================
# AGENTES
# ============================================================================
# Define los agentes que quieres usar.
# Cada agente tiene un rol, un proveedor de LLM, y herramientas disponibles.
agents:
# Agente por defecto - puedes renombrarlo o borrarlo
assistant:
role: |
Eres un asistente general que ayuda con tareas diversas.
Puedes ejecutar comandos, leer y escribir archivos.
provider: claude
model: sonnet
tools:
- bash
- read
- write
- list_dir
# Ejemplo de agente especializado en código
# coder:
# role: |
# Eres un programador experto.
# Escribes código limpio y bien documentado.
# Siempre incluyes tests cuando es apropiado.
# provider: litellm
# model: gpt4o
# tools:
# - read
# - write
# - bash
# - grep
# - glob
# Ejemplo de agente de investigación
# researcher:
# role: |
# Eres un investigador que busca y analiza información.
# Eres metódico y verificas tus fuentes.
# provider: litellm
# model: gemini-pro
# tools:
# - http_request
# - read
# - write
# ============================================================================
# TAREAS PREDEFINIDAS (opcional)
# ============================================================================
# Define secuencias de acciones que se ejecutan automáticamente
tasks:
# Ejemplo:
# deploy:
# description: "Despliega la aplicación a producción"
# steps:
# - agent: coder
# prompt: "Ejecuta los tests"
# - agent: deployer
# prompt: "Despliega a producción"
# ============================================================================
# NOTAS
# ============================================================================
#
# PROVIDERS DISPONIBLES:
# - claude: Usa Claude Code CLI (requiere suscripción o API key)
# - litellm: Usa LiteLLM para acceder a 100+ modelos
#
# MODELOS LITELLM (ejemplos):
# - gpt4o, gpt4-turbo, o1 (OpenAI)
# - gemini-pro, gemini-flash (Google)
# - mistral, mixtral (Mistral)
# - llama3, codellama (Ollama local)
# - groq-llama (Groq - muy rápido)
#
# HERRAMIENTAS:
# - bash: Ejecuta comandos del sistema
# - read: Lee archivos
# - write: Escribe/crea archivos
# - glob: Busca archivos por patrón (*.py, **/*.md)
# - grep: Busca texto en archivos
# - ssh: Ejecuta comandos en servidores remotos
# - http_request: Hace peticiones HTTP
# - list_dir: Lista directorios

63
examples/dev_team.yaml Normal file
View File

@@ -0,0 +1,63 @@
# examples/dev_team.yaml
# Ejemplo: Equipo de desarrollo de software
settings:
default_provider: claude
default_model: sonnet
timeout: 300
agents:
architect:
role: |
Eres un arquitecto de software senior.
Diseñas sistemas escalables y mantenibles.
Tomas decisiones técnicas importantes.
Documentas tus decisiones en ADRs (Architecture Decision Records).
provider: claude
model: opus
tools:
- read
- write
- list_dir
- glob
developer:
role: |
Eres un desarrollador full-stack experimentado.
Escribes código limpio, bien documentado y testeable.
Sigues las mejores prácticas del lenguaje que uses.
Siempre incluyes manejo de errores apropiado.
provider: claude
model: sonnet
tools:
- read
- write
- bash
- grep
- glob
reviewer:
role: |
Eres un revisor de código exigente pero constructivo.
Buscas bugs, problemas de seguridad y mejoras.
Sugieres refactorizaciones cuando son necesarias.
Validas que el código siga los estándares.
provider: litellm
model: gpt4o
tools:
- read
- grep
- glob
tester:
role: |
Eres un ingeniero de QA especializado en testing.
Escribes tests unitarios, de integración y e2e.
Identificas edge cases y escenarios de error.
Aseguras buena cobertura de tests.
provider: litellm
model: gemini-pro
tools:
- read
- write
- bash

77
examples/devops.yaml Normal file
View File

@@ -0,0 +1,77 @@
# examples/devops.yaml
# Ejemplo: Equipo DevOps con servidores
settings:
default_provider: claude
default_model: sonnet
timeout: 300
servers:
production:
host: prod.example.com
user: deploy
key: ~/.ssh/prod_key
description: "Servidor de producción"
staging:
host: staging.example.com
user: deploy
key: ~/.ssh/staging_key
description: "Servidor de staging"
monitoring:
host: monitor.example.com
user: admin
key: ~/.ssh/monitor_key
description: "Servidor de monitoreo"
agents:
deployer:
role: |
Eres un ingeniero de deploy experimentado.
Despliegas aplicaciones de forma segura.
Siempre haces backup antes de cambios.
Verificas el estado después de cada deploy.
NUNCA ejecutas comandos destructivos sin confirmación.
provider: claude
model: sonnet
tools:
- ssh
- bash
- read
servers:
- production
- staging
monitor:
role: |
Eres un especialista en monitoreo.
Verificas métricas y logs.
Identificas anomalías y problemas.
Alertas sobre situaciones críticas.
provider: claude
model: haiku
tools:
- ssh
- bash
- http_request
servers:
- monitoring
- production
security:
role: |
Eres un ingeniero de seguridad.
Auditas configuraciones y permisos.
Buscas vulnerabilidades.
Recomiendas mejoras de seguridad.
provider: litellm
model: gpt4o
tools:
- ssh
- read
- bash
- grep
servers:
- production
- staging

View File

@@ -0,0 +1,46 @@
# examples/local_ollama.yaml
# Ejemplo: Usando modelos locales con Ollama
#
# Requisitos:
# 1. Instalar Ollama: https://ollama.ai
# 2. Descargar modelos: ollama pull llama3
# 3. Ollama debe estar corriendo: ollama serve
settings:
default_provider: litellm
default_model: llama3
timeout: 600 # Modelos locales pueden ser más lentos
agents:
coder:
role: |
Eres un programador que ayuda con código.
Explicas tu razonamiento paso a paso.
provider: litellm
model: codellama
tools:
- read
- write
- bash
writer:
role: |
Eres un escritor creativo.
Ayudas con textos, emails y documentos.
provider: litellm
model: llama3
tools:
- read
- write
analyst:
role: |
Eres un analista de datos.
Procesas archivos y extraes información.
provider: litellm
model: mixtral-local
tools:
- read
- bash
- glob
- grep

59
examples/research.yaml Normal file
View File

@@ -0,0 +1,59 @@
# examples/research.yaml
# Ejemplo: Equipo de investigación
settings:
default_provider: litellm
default_model: gpt4o
timeout: 600 # Más tiempo para investigación
agents:
researcher:
role: |
Eres un investigador académico metódico.
Buscas información de fuentes confiables.
Citas tus fuentes apropiadamente.
Identificas gaps en el conocimiento actual.
provider: litellm
model: gpt4o
tools:
- http_request
- read
- write
analyst:
role: |
Eres un analista de datos experto.
Procesas y analizas grandes cantidades de información.
Encuentras patrones y tendencias.
Presentas datos de forma clara y visual.
provider: litellm
model: gemini-pro
tools:
- read
- write
- bash
- glob
writer:
role: |
Eres un escritor técnico profesional.
Conviertes información compleja en texto claro.
Adaptas el tono al público objetivo.
Estructuras documentos de forma lógica.
provider: claude
model: sonnet
tools:
- read
- write
editor:
role: |
Eres un editor riguroso.
Corriges gramática, estilo y claridad.
Verificas consistencia en todo el documento.
Mejoras la legibilidad sin cambiar el mensaje.
provider: claude
model: haiku # Rápido para edición
tools:
- read
- write

21
examples/simple.yaml Normal file
View File

@@ -0,0 +1,21 @@
# examples/simple.yaml
# Ejemplo: Un solo agente asistente
settings:
default_provider: claude
default_model: sonnet
timeout: 300
agents:
assistant:
role: |
Eres un asistente útil y amable.
Ayudas con cualquier tarea que te pidan.
Eres claro y conciso en tus respuestas.
provider: claude
model: sonnet
tools:
- bash
- read
- write
- list_dir

0
logs/.gitkeep Normal file
View File

8
orchestrator/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
# orchestrator/__init__.py
"""LLM Orchestrator - Sistema de orquestación multi-agente seguro."""
from .config import get_config, reload_config
from .agents import Agent, AgentResult
__version__ = "2.0.0"
__all__ = ["get_config", "reload_config", "Agent", "AgentResult"]

View File

@@ -0,0 +1,6 @@
# orchestrator/agents/__init__.py
"""Agentes del orquestador."""
from .base import Agent, AgentResult
__all__ = ["Agent", "AgentResult"]

182
orchestrator/agents/base.py Normal file
View File

@@ -0,0 +1,182 @@
# orchestrator/agents/base.py
"""Clase base para agentes."""
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Optional
from dataclasses import dataclass, field
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import get_config, AgentConfig
@dataclass
class AgentResult:
"""Resultado de una ejecución de agente."""
success: bool
output: str
agent_name: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
error: Optional[str] = None
tool_results: list = field(default_factory=list)
metadata: dict = field(default_factory=dict)
class Agent:
"""
Agente de IA configurable.
"""
def __init__(
self,
name: str = "",
role: str = "",
provider: str = "claude",
model: str = "sonnet",
tools: list = None,
servers: list = None,
config_obj: AgentConfig = None
):
if config_obj:
self.name = config_obj.name
self.role = config_obj.role
self.provider_name = config_obj.provider
self.model = config_obj.model
self.tools = config_obj.tools or []
self.servers = config_obj.servers or []
else:
self.name = name
self.role = role
self.provider_name = provider
self.model = model
self.tools = tools or []
self.servers = servers or []
self.config = get_config()
self.log_file = self.config.logs_dir / f"{self.name}.md"
self.output_dir = self.config.outputs_dir / self.name
self.provider = self._create_provider()
self.system_prompt = self._build_system_prompt()
self.output_dir.mkdir(parents=True, exist_ok=True)
if not self.log_file.exists():
self._init_log()
def _create_provider(self):
if self.provider_name == "claude":
from providers import ClaudeProvider
return ClaudeProvider(
model=self.model,
timeout=self.config.settings.timeout,
rate_limit_per_minute=self.config.settings.rate_limit_per_minute,
max_retries=self.config.settings.max_retries,
)
else:
from providers import LiteLLMProvider
return LiteLLMProvider(
model=self.model if self.provider_name == "litellm" else self.provider_name,
timeout=self.config.settings.timeout,
working_dir=str(self.config.base_dir),
max_tool_iterations=self.config.settings.max_tool_iterations,
rate_limit_per_minute=self.config.settings.rate_limit_per_minute,
max_retries=self.config.settings.max_retries,
)
def _build_system_prompt(self) -> str:
prompt = f"# {self.name}\n\n{self.role}"
if self.servers:
prompt += "\n\n## Servidores\n"
for server_name in self.servers:
server = self.config.get_server(server_name)
if server:
prompt += f"- {server_name}: {server.user}@{server.host}\n"
return prompt
def _init_log(self):
header = f"""# {self.name}
- **Creado:** {datetime.now().isoformat()}
- **Provider:** {self.provider_name}
- **Modelo:** {self.model}
- **Herramientas:** {', '.join(self.tools) if self.tools else 'ninguna'}
---
"""
self.log_file.write_text(header)
def log(self, entry_type: str, content: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"\n## {timestamp} | {entry_type}\n{content}\n"
with open(self.log_file, "a") as f:
f.write(entry)
async def run(self, prompt: str, log_action: bool = True) -> AgentResult:
if log_action:
self.log("PROMPT", prompt[:200] + "..." if len(prompt) > 200 else prompt)
try:
response = await self.provider.run_with_tools(
prompt=prompt,
tools=self.tools,
system_prompt=self.system_prompt,
)
result = AgentResult(
success=response.success,
output=response.text,
agent_name=self.name,
error=response.error,
tool_results=response.tool_results if hasattr(response, 'tool_results') else [],
metadata={
"usage": response.usage,
"retries": response.retries,
} if response.usage else {}
)
if log_action:
status = "RESULTADO" if response.success else "ERROR"
content = response.text[:500] if response.success else (response.error or "Error desconocido")
self.log(status, content)
return result
except Exception as e:
if log_action:
self.log("ERROR", str(e))
return AgentResult(
success=False, output="",
agent_name=self.name, error=str(e)
)
def save_output(self, filename: str, content: str) -> Path:
filepath = self.output_dir / filename
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(content)
self.log("ARCHIVO", f"Creado: {filepath}")
return filepath
def read_log(self, last_n: int = None) -> str:
if not self.log_file.exists():
return ""
content = self.log_file.read_text()
if last_n:
entries = content.split("\n## ")
return "\n## ".join(entries[-last_n:])
return content
def __repr__(self):
return f"Agent({self.name}, {self.provider_name}/{self.model})"

284
orchestrator/config.py Normal file
View File

@@ -0,0 +1,284 @@
# orchestrator/config.py
"""
Configuración segura del orquestador.
Carga configuración desde:
1. Variables de entorno
2. Archivo .env
3. config.yaml
NUNCA hardcodea credenciales aquí.
"""
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Any
def load_env():
"""Carga variables desde .env si existe."""
env_file = Path.cwd() / ".env"
if not env_file.exists():
env_file = Path(__file__).parent.parent / ".env"
if env_file.exists():
with open(env_file) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, _, value = line.partition("=")
key = key.strip()
value = value.strip().strip('"').strip("'")
if key and value:
os.environ.setdefault(key, value)
# Cargar .env al importar
load_env()
def get_env(key: str, default: str = "") -> str:
"""Obtiene variable de entorno."""
return os.environ.get(key, default)
def get_env_bool(key: str, default: bool = False) -> bool:
"""Obtiene variable de entorno como booleano."""
val = os.environ.get(key, "").lower()
if val in ("true", "yes", "1"):
return True
if val in ("false", "no", "0"):
return False
return default
def get_env_list(key: str, default: list = None) -> list:
"""Obtiene variable de entorno como lista."""
val = os.environ.get(key, "")
if val:
return [x.strip() for x in val.split(",") if x.strip()]
return default or []
@dataclass
class ServerConfig:
"""Configuración de un servidor."""
name: str
host: str
user: str = "root"
key: str = ""
description: str = ""
def __post_init__(self):
if not self.key:
self.key = get_env("SSH_KEY_PATH", "~/.ssh/id_rsa")
@dataclass
class AgentConfig:
"""Configuración de un agente."""
name: str
role: str
provider: str = "claude"
model: str = "sonnet"
tools: list = field(default_factory=list)
servers: list = field(default_factory=list)
@dataclass
class Settings:
"""Configuración general."""
default_provider: str = "claude"
default_model: str = "sonnet"
timeout: float = 300.0
working_dir: str = "."
max_tool_iterations: int = 10
# Seguridad
ssh_strict_host_checking: bool = True
sandbox_paths: bool = True # Restringir paths al working_dir
allowed_commands: list = field(default_factory=list) # Whitelist de comandos
# Rate limiting
rate_limit_per_minute: int = 60
# Retry
max_retries: int = 3
retry_delay: float = 1.0
@dataclass
class GitConfig:
"""Configuración de Gitea/Git."""
url: str = ""
token_read: str = ""
token_write: str = ""
org: str = ""
def __post_init__(self):
self.url = self.url or get_env("GITEA_URL")
self.token_read = self.token_read or get_env("GITEA_TOKEN_READ")
self.token_write = self.token_write or get_env("GITEA_TOKEN_WRITE")
self.org = self.org or get_env("GITEA_ORG")
@property
def is_configured(self) -> bool:
return bool(self.url and self.token_read)
@dataclass
class R2Config:
"""Configuración de Cloudflare R2."""
endpoint: str = ""
access_key: str = ""
secret_key: str = ""
buckets: list = field(default_factory=list)
def __post_init__(self):
self.endpoint = self.endpoint or get_env("R2_ENDPOINT")
self.access_key = self.access_key or get_env("R2_ACCESS_KEY")
self.secret_key = self.secret_key or get_env("R2_SECRET_KEY")
self.buckets = self.buckets or get_env_list("R2_BUCKETS")
@property
def is_configured(self) -> bool:
return bool(self.endpoint and self.access_key and self.secret_key)
class Config:
"""Gestiona la configuración del orquestador."""
def __init__(self, config_path: Optional[str] = None):
self.config_path = self._find_config(config_path)
self.base_dir = self.config_path.parent if self.config_path else Path.cwd()
self._raw = self._load_yaml() if self.config_path else {}
# Parsear configuración
self.settings = self._parse_settings()
self.servers = self._parse_servers()
self.agents = self._parse_agents()
self.tasks = self._raw.get("tasks", {})
# Servicios externos (desde .env)
self.git = GitConfig()
self.r2 = R2Config()
# Rutas
self.logs_dir = self.base_dir / "logs"
self.outputs_dir = self.base_dir / "outputs"
# Crear directorios
self.logs_dir.mkdir(exist_ok=True)
self.outputs_dir.mkdir(exist_ok=True)
def _find_config(self, config_path: Optional[str]) -> Optional[Path]:
"""Encuentra el archivo de configuración."""
if config_path:
path = Path(config_path)
if path.exists():
return path
raise FileNotFoundError(f"Config no encontrado: {config_path}")
search_paths = [
Path.cwd() / "config.yaml",
Path.cwd() / "config.yml",
Path(__file__).parent.parent / "config.yaml",
]
for path in search_paths:
if path.exists():
return path
return None # Sin config, usar defaults
def _load_yaml(self) -> dict:
"""Carga el archivo YAML."""
if not self.config_path:
return {}
try:
import yaml
with open(self.config_path) as f:
return yaml.safe_load(f) or {}
except ImportError:
print("AVISO: PyYAML no instalado. pip install pyyaml")
return {}
def _parse_settings(self) -> Settings:
"""Parsea la sección settings."""
raw = self._raw.get("settings", {})
return Settings(
default_provider=raw.get("default_provider", "claude"),
default_model=raw.get("default_model", "sonnet"),
timeout=float(raw.get("timeout", 300)),
working_dir=raw.get("working_dir", "."),
max_tool_iterations=int(raw.get("max_tool_iterations", 10)),
ssh_strict_host_checking=raw.get("ssh_strict_host_checking",
get_env_bool("SSH_KNOWN_HOSTS_CHECK", True)),
sandbox_paths=raw.get("sandbox_paths", True),
allowed_commands=raw.get("allowed_commands", []),
rate_limit_per_minute=int(raw.get("rate_limit_per_minute", 60)),
max_retries=int(raw.get("max_retries", 3)),
retry_delay=float(raw.get("retry_delay", 1.0)),
)
def _parse_servers(self) -> dict[str, ServerConfig]:
"""Parsea la sección servers."""
servers = {}
for name, data in self._raw.get("servers", {}).items():
if data:
servers[name] = ServerConfig(
name=name,
host=data.get("host", ""),
user=data.get("user", "root"),
key=data.get("key", get_env("SSH_KEY_PATH", "~/.ssh/id_rsa")),
description=data.get("description", ""),
)
return servers
def _parse_agents(self) -> dict[str, AgentConfig]:
"""Parsea la sección agents."""
agents = {}
for name, data in self._raw.get("agents", {}).items():
if data:
agents[name] = AgentConfig(
name=name,
role=data.get("role", ""),
provider=data.get("provider", self.settings.default_provider),
model=data.get("model", self.settings.default_model),
tools=data.get("tools", []),
servers=data.get("servers", []),
)
return agents
def get_agent(self, name: str) -> Optional[AgentConfig]:
return self.agents.get(name)
def get_server(self, name: str) -> Optional[ServerConfig]:
return self.servers.get(name)
def list_agents(self) -> list[str]:
return list(self.agents.keys())
def list_servers(self) -> list[str]:
return list(self.servers.keys())
# Instancia global
_config: Optional[Config] = None
def get_config(config_path: Optional[str] = None) -> Config:
"""Obtiene la configuración global."""
global _config
if _config is None:
_config = Config(config_path)
return _config
def reload_config(config_path: Optional[str] = None) -> Config:
"""Recarga la configuración."""
global _config
_config = Config(config_path)
return _config

224
orchestrator/main.py Normal file
View File

@@ -0,0 +1,224 @@
#!/usr/bin/env python3
# orchestrator/main.py
"""
LLM Orchestrator - Sistema de orquestación multi-agente seguro.
Uso:
python main.py # Modo interactivo
python main.py --status # Ver estado
python main.py --agents # Listar agentes
python main.py --agent X --prompt "Y" # Ejecutar prompt
"""
import asyncio
import argparse
import json
from datetime import datetime
from pathlib import Path
import sys
sys.path.insert(0, str(Path(__file__).parent))
from config import get_config, reload_config
from agents import Agent, AgentResult
class Orchestrator:
"""Orquestador principal."""
def __init__(self, config_path: str = None):
print("Iniciando Orchestrator...")
self.config = get_config(config_path)
self.agents: dict[str, Agent] = {}
for name, agent_config in self.config.agents.items():
try:
self.agents[name] = Agent(config_obj=agent_config)
except Exception as e:
print(f"⚠️ Error creando agente {name}: {e}")
if self.agents:
print(f"{len(self.agents)} agente(s): {', '.join(self.agents.keys())}")
else:
print("⚠️ Sin agentes. Edita config.yaml")
async def run_agent(self, agent_name: str, prompt: str) -> AgentResult:
if agent_name not in self.agents:
return AgentResult(
success=False, output="",
agent_name=agent_name,
error=f"Agente '{agent_name}' no existe"
)
return await self.agents[agent_name].run(prompt)
async def run_all(self, prompt: str) -> dict[str, AgentResult]:
"""Ejecuta en todos los agentes EN PARALELO."""
tasks = {
name: agent.run(prompt)
for name, agent in self.agents.items()
}
# Ejecutar en paralelo con gather
results_list = await asyncio.gather(*tasks.values(), return_exceptions=True)
results = {}
for name, result in zip(tasks.keys(), results_list):
if isinstance(result, Exception):
results[name] = AgentResult(
success=False, output="",
agent_name=name, error=str(result)
)
else:
results[name] = result
return results
def get_status(self) -> dict:
return {
"config_path": str(self.config.config_path) if self.config.config_path else None,
"settings": {
"timeout": self.config.settings.timeout,
"rate_limit": self.config.settings.rate_limit_per_minute,
"max_retries": self.config.settings.max_retries,
"sandbox_paths": self.config.settings.sandbox_paths,
},
"agents": {
name: {
"provider": agent.provider_name,
"model": agent.model,
"tools": agent.tools,
}
for name, agent in self.agents.items()
},
"servers": list(self.config.servers.keys()),
"gitea_configured": self.config.git.is_configured,
"r2_configured": self.config.r2.is_configured,
}
async def interactive(self):
print("\n" + "="*60)
print("LLM Orchestrator - Modo Interactivo")
print("="*60)
print("\nComandos:")
print(" /status - Ver estado")
print(" /agents - Listar agentes")
print(" /agent <n> - Cambiar agente activo")
print(" /logs <n> - Ver log de un agente")
print(" /reload - Recargar configuración")
print(" /all - Ejecutar en todos (paralelo)")
print(" /quit - Salir")
print("-"*60)
if not self.agents:
print("\n⚠️ No hay agentes. Edita config.yaml")
return
current_agent = list(self.agents.keys())[0]
while True:
try:
prompt = input(f"\n[{current_agent}] > ").strip()
if not prompt:
continue
if prompt == "/quit":
print("Saliendo...")
break
elif prompt == "/status":
print(json.dumps(self.get_status(), indent=2))
elif prompt == "/agents":
for name, agent in self.agents.items():
marker = "" if name == current_agent else " "
print(f" {marker} {name}: {agent.provider_name}/{agent.model}")
if agent.tools:
print(f" tools: {', '.join(agent.tools)}")
elif prompt.startswith("/agent "):
name = prompt[7:].strip()
if name in self.agents:
current_agent = name
print(f"Agente activo: {current_agent}")
else:
print(f"No existe '{name}'. Disponibles: {', '.join(self.agents.keys())}")
elif prompt.startswith("/logs "):
name = prompt[6:].strip()
if name in self.agents:
log = self.agents[name].read_log(last_n=5)
print(log if log else "(vacío)")
else:
print(f"No existe '{name}'")
elif prompt == "/reload":
self.config = reload_config()
self.agents = {}
for name, agent_config in self.config.agents.items():
try:
self.agents[name] = Agent(config_obj=agent_config)
except Exception as e:
print(f"⚠️ Error: {name}: {e}")
print(f"✅ Recargado: {len(self.agents)} agente(s)")
elif prompt == "/all":
user_prompt = input("Prompt: ").strip()
if user_prompt:
print("Ejecutando en paralelo...")
results = await self.run_all(user_prompt)
for name, result in results.items():
status = "" if result.success else ""
output = result.output[:200] + "..." if len(result.output) > 200 else result.output
print(f"\n{status} {name}:\n{output or result.error}")
else:
print("Ejecutando...")
result = await self.run_agent(current_agent, prompt)
if result.success:
print(f"\n{result.output}")
else:
print(f"\n{result.error}")
except KeyboardInterrupt:
print("\n\nUsa /quit para salir.")
except Exception as e:
print(f"\n❌ Error: {e}")
async def main():
parser = argparse.ArgumentParser(description="LLM Orchestrator")
parser.add_argument("--config", type=str, help="Ruta a config.yaml")
parser.add_argument("--status", action="store_true", help="Ver estado")
parser.add_argument("--agents", action="store_true", help="Listar agentes")
parser.add_argument("--agent", type=str, help="Agente a usar")
parser.add_argument("--prompt", type=str, help="Prompt a ejecutar")
args = parser.parse_args()
orchestrator = Orchestrator(args.config)
if args.status:
print(json.dumps(orchestrator.get_status(), indent=2))
elif args.agents:
for name, agent in orchestrator.agents.items():
print(f" {name}: {agent.provider_name}/{agent.model}")
elif args.agent and args.prompt:
result = await orchestrator.run_agent(args.agent, args.prompt)
if result.success:
print(result.output)
else:
print(f"Error: {result.error}", file=sys.stderr)
sys.exit(1)
else:
await orchestrator.interactive()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,13 @@
# orchestrator/providers/__init__.py
"""Providers de modelos LLM."""
from .base import BaseProvider, ProviderResponse
from .claude_provider import ClaudeProvider
from .litellm_provider import LiteLLMProvider
__all__ = [
"BaseProvider",
"ProviderResponse",
"ClaudeProvider",
"LiteLLMProvider",
]

View File

@@ -0,0 +1,146 @@
# orchestrator/providers/base.py
"""Clase base abstracta para providers de modelos."""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Optional, Any
from datetime import datetime
import asyncio
import time
from collections import deque
@dataclass
class ProviderResponse:
"""Respuesta estándar de cualquier provider."""
success: bool
text: str
provider: str
model: str
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
error: Optional[str] = None
usage: Optional[dict] = None
raw_response: Optional[Any] = None
tool_calls: list = field(default_factory=list)
tool_results: list = field(default_factory=list)
retries: int = 0
@property
def ok(self) -> bool:
return self.success
class RateLimiter:
"""Rate limiter para llamadas a APIs."""
def __init__(self, max_calls: int = 60, period: float = 60.0):
self.max_calls = max_calls
self.period = period
self.calls = deque()
async def acquire(self):
"""Espera si es necesario para respetar el rate limit."""
now = time.time()
while self.calls and self.calls[0] < now - self.period:
self.calls.popleft()
if len(self.calls) >= self.max_calls:
wait_time = self.calls[0] + self.period - now
if wait_time > 0:
await asyncio.sleep(wait_time)
self.calls.append(time.time())
class BaseProvider(ABC):
"""
Clase base abstracta para todos los providers de modelos.
Incluye:
- Rate limiting
- Retry con backoff exponencial
- Manejo de errores consistente
"""
def __init__(
self,
model: str,
timeout: float = 300.0,
rate_limit_per_minute: int = 60,
max_retries: int = 3,
retry_delay: float = 1.0,
**kwargs
):
self.model = model
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
self.config = kwargs
# Rate limiting
self.rate_limiter = RateLimiter(rate_limit_per_minute)
@property
@abstractmethod
def name(self) -> str:
"""Nombre del provider."""
pass
@property
@abstractmethod
def available_models(self) -> list[str]:
"""Lista de modelos disponibles."""
pass
@property
def supports_native_tools(self) -> bool:
"""Indica si soporta function calling nativo."""
return False
async def _retry_with_backoff(self, func, *args, **kwargs) -> tuple[Any, int]:
"""Ejecuta con retry y backoff exponencial."""
last_error = None
for attempt in range(self.max_retries + 1):
try:
# Rate limiting
await self.rate_limiter.acquire()
result = await func(*args, **kwargs)
return result, attempt
except Exception as e:
last_error = e
if attempt < self.max_retries:
delay = min(self.retry_delay * (2 ** attempt), 30.0)
await asyncio.sleep(delay)
raise last_error
@abstractmethod
async def run(
self,
prompt: str,
system_prompt: Optional[str] = None,
**kwargs
) -> ProviderResponse:
"""Ejecuta un prompt."""
pass
@abstractmethod
async def run_with_tools(
self,
prompt: str,
tools: list[str],
system_prompt: Optional[str] = None,
**kwargs
) -> ProviderResponse:
"""Ejecuta un prompt con herramientas."""
pass
def validate_model(self, model: str) -> bool:
return model in self.available_models
def __repr__(self) -> str:
return f"{self.__class__.__name__}(model={self.model})"

View File

@@ -0,0 +1,179 @@
# orchestrator/providers/claude_provider.py
"""Provider para Claude usando el CLI de Claude Code."""
import asyncio
import json
import shutil
from typing import Optional
from pathlib import Path
from .base import BaseProvider, ProviderResponse
class ClaudeProvider(BaseProvider):
"""
Provider que usa el CLI de Claude Code.
Ejecuta claude via subprocess, parseando la salida JSON.
Funciona con suscripción Pro/Max o API key.
"""
MODELS = {
"haiku": "claude-3-5-haiku-latest",
"sonnet": "claude-sonnet-4-20250514",
"opus": "claude-opus-4-20250514",
"fast": "claude-3-5-haiku-latest",
"default": "claude-sonnet-4-20250514",
"powerful": "claude-opus-4-20250514",
}
AVAILABLE_TOOLS = [
"Read", "Write", "Edit", "Bash",
"Glob", "Grep", "WebFetch",
]
def __init__(
self,
model: str = "sonnet",
timeout: float = 300.0,
cli_path: str = "claude",
working_directory: Optional[str] = None,
**kwargs
):
super().__init__(model=model, timeout=timeout, **kwargs)
self.cli_path = cli_path
self.working_directory = working_directory
if not shutil.which(self.cli_path):
raise FileNotFoundError(
f"Claude CLI no encontrado en '{self.cli_path}'. "
"Asegúrate de que Claude Code está instalado."
)
@property
def name(self) -> str:
return "claude"
@property
def available_models(self) -> list[str]:
return list(self.MODELS.keys())
@property
def supports_native_tools(self) -> bool:
return True # Claude CLI tiene herramientas nativas
def _resolve_model(self, model: str) -> str:
return self.MODELS.get(model, model)
def _build_command(
self,
prompt: str,
tools: Optional[list[str]] = None,
system_prompt: Optional[str] = None,
max_turns: Optional[int] = None,
) -> list[str]:
cmd = [self.cli_path, "-p", prompt, "--output-format", "json"]
resolved_model = self._resolve_model(self.model)
cmd.extend(["--model", resolved_model])
if system_prompt:
cmd.extend(["--system-prompt", system_prompt])
if tools:
valid_tools = [t for t in tools if t in self.AVAILABLE_TOOLS]
if valid_tools:
cmd.extend(["--allowedTools", ",".join(valid_tools)])
if max_turns:
cmd.extend(["--max-turns", str(max_turns)])
return cmd
async def _execute(
self,
prompt: str,
tools: list[str] = None,
system_prompt: Optional[str] = None,
max_turns: Optional[int] = None,
) -> ProviderResponse:
"""Ejecución interna."""
cmd = self._build_command(prompt, tools, system_prompt, max_turns)
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=self.working_directory,
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout
)
if process.returncode != 0:
error_msg = stderr.decode() if stderr else f"Exit code: {process.returncode}"
return ProviderResponse(
success=False, text="", provider=self.name,
model=self.model, error=error_msg
)
try:
response = json.loads(stdout.decode())
except json.JSONDecodeError as e:
return ProviderResponse(
success=False, text=stdout.decode(),
provider=self.name, model=self.model,
error=f"Error parseando JSON: {e}"
)
is_error = response.get("is_error", False)
return ProviderResponse(
success=not is_error,
text=response.get("result", ""),
provider=self.name,
model=self.model,
error=response.get("error") if is_error else None,
usage=response.get("usage"),
raw_response=response
)
async def run(
self,
prompt: str,
system_prompt: Optional[str] = None,
**kwargs
) -> ProviderResponse:
return await self.run_with_tools(prompt, [], system_prompt, **kwargs)
async def run_with_tools(
self,
prompt: str,
tools: list[str],
system_prompt: Optional[str] = None,
max_turns: Optional[int] = None,
**kwargs
) -> ProviderResponse:
try:
result, retries = await self._retry_with_backoff(
self._execute, prompt, tools, system_prompt, max_turns
)
result.retries = retries
return result
except asyncio.TimeoutError:
return ProviderResponse(
success=False, text="", provider=self.name,
model=self.model, error=f"Timeout después de {self.timeout}s"
)
except Exception as e:
return ProviderResponse(
success=False, text="", provider=self.name,
model=self.model, error=str(e)
)
async def health_check(self) -> bool:
response = await self.run("Responde solo 'OK'")
return response.success and "OK" in response.text

View File

@@ -0,0 +1,250 @@
# orchestrator/providers/litellm_provider.py
"""
Provider universal usando LiteLLM.
Soporta 100+ modelos con una interfaz unificada.
"""
import asyncio
from typing import Optional
from pathlib import Path
from .base import BaseProvider, ProviderResponse
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
class LiteLLMProvider(BaseProvider):
"""
Provider universal usando LiteLLM.
Soporta: OpenAI, Google, Anthropic, Mistral, Ollama, Groq, Together, etc.
"""
MODEL_ALIASES = {
# OpenAI
"gpt4": "gpt-4",
"gpt4o": "gpt-4o",
"gpt4-turbo": "gpt-4-turbo",
"o1": "o1-preview",
"o1-mini": "o1-mini",
# Google Gemini
"gemini-pro": "gemini/gemini-1.5-pro",
"gemini-flash": "gemini/gemini-1.5-flash",
# Anthropic (via API)
"claude-api": "claude-3-5-sonnet-20241022",
"claude-opus-api": "claude-3-opus-20240229",
# Mistral
"mistral": "mistral/mistral-large-latest",
"mixtral": "mistral/open-mixtral-8x22b",
# Ollama (local)
"llama3": "ollama/llama3",
"codellama": "ollama/codellama",
"mixtral-local": "ollama/mixtral",
# Groq
"groq-llama": "groq/llama3-70b-8192",
# Together
"together-llama": "together_ai/meta-llama/Llama-3-70b-chat-hf",
}
def __init__(
self,
model: str = "gpt-4o",
timeout: float = 300.0,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
working_dir: Optional[str] = None,
max_tool_iterations: int = 10,
**kwargs
):
super().__init__(model=model, timeout=timeout, **kwargs)
self.api_key = api_key
self.api_base = api_base
self.working_dir = working_dir
self.max_tool_iterations = max_tool_iterations
self._litellm = None
self._tool_executor = None
def _get_litellm(self):
if self._litellm is None:
try:
import litellm
self._litellm = litellm
except ImportError:
raise ImportError("LiteLLM no instalado. Ejecuta: pip install litellm")
return self._litellm
def _get_tool_executor(self):
if self._tool_executor is None:
from tools import ToolExecutor
from config import get_config
config = get_config()
self._tool_executor = ToolExecutor(
working_dir=self.working_dir or str(config.base_dir),
timeout=self.timeout,
sandbox_paths=config.settings.sandbox_paths,
allowed_commands=config.settings.allowed_commands or None,
rate_limit_per_minute=config.settings.rate_limit_per_minute,
max_retries=config.settings.max_retries,
ssh_strict_host_checking=config.settings.ssh_strict_host_checking,
)
return self._tool_executor
@property
def name(self) -> str:
return "litellm"
@property
def available_models(self) -> list[str]:
return list(self.MODEL_ALIASES.keys())
@property
def supports_native_tools(self) -> bool:
resolved = self._resolve_model(self.model)
return any(x in resolved for x in ["gpt-4", "claude", "gemini"])
def _resolve_model(self, model: str) -> str:
return self.MODEL_ALIASES.get(model, model)
async def _execute(
self,
messages: list,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
):
"""Ejecución interna."""
litellm = self._get_litellm()
resolved_model = self._resolve_model(self.model)
completion_kwargs = {
"model": resolved_model,
"messages": messages,
"temperature": temperature,
"timeout": self.timeout,
}
if max_tokens:
completion_kwargs["max_tokens"] = max_tokens
if self.api_key:
completion_kwargs["api_key"] = self.api_key
if self.api_base:
completion_kwargs["api_base"] = self.api_base
return await litellm.acompletion(**completion_kwargs)
async def run(
self,
prompt: str,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs
) -> ProviderResponse:
return await self.run_with_tools(
prompt, [], system_prompt, temperature, max_tokens, **kwargs
)
async def run_with_tools(
self,
prompt: str,
tools: list[str],
system_prompt: Optional[str] = None,
temperature: float = 0.7,
max_tokens: Optional[int] = None,
**kwargs
) -> ProviderResponse:
try:
resolved_model = self._resolve_model(self.model)
# Construir system prompt con herramientas
full_system = system_prompt or ""
if tools:
from tools.definitions import get_tools_prompt
tools_prompt = get_tools_prompt(tools)
full_system = f"{full_system}\n\n{tools_prompt}"
messages = []
if full_system:
messages.append({"role": "system", "content": full_system})
messages.append({"role": "user", "content": prompt})
# Loop de herramientas
all_tool_results = []
total_retries = 0
for iteration in range(self.max_tool_iterations):
try:
response, retries = await self._retry_with_backoff(
self._execute, messages, temperature, max_tokens
)
total_retries += retries
except Exception as e:
return ProviderResponse(
success=False, text="", provider=self.name,
model=resolved_model, error=f"Error LiteLLM: {e}"
)
text = response.choices[0].message.content or ""
# Buscar herramientas
if tools:
executor = self._get_tool_executor()
tool_results = await executor.execute_from_text(text)
if tool_results:
all_tool_results.extend(tool_results)
results_text = "\n\n".join([
f"Resultado de {r.tool}: {'' if r.success else ''}\n{r.output if r.success else r.error}"
for r in tool_results
])
messages.append({"role": "assistant", "content": text})
messages.append({"role": "user", "content": f"Resultados:\n{results_text}\n\nContinúa."})
continue
# Sin herramientas, terminamos
break
# Extraer uso
usage = None
if hasattr(response, 'usage') and response.usage:
usage = {
"input_tokens": response.usage.prompt_tokens,
"output_tokens": response.usage.completion_tokens,
}
return ProviderResponse(
success=True,
text=text,
provider=self.name,
model=resolved_model,
usage=usage,
tool_results=[r.__dict__ for r in all_tool_results],
retries=total_retries,
)
except ImportError as e:
return ProviderResponse(
success=False, text="", provider=self.name,
model=self.model, error=str(e)
)
except Exception as e:
return ProviderResponse(
success=False, text="", provider=self.name,
model=self.model, error=f"Error: {e}"
)
async def health_check(self) -> bool:
response = await self.run("Responde solo 'OK'", max_tokens=10)
return response.success

View File

@@ -0,0 +1 @@
# Tasks predefinidas

View File

@@ -0,0 +1,12 @@
# orchestrator/tools/__init__.py
"""Sistema de herramientas universal para cualquier LLM."""
from .executor import ToolExecutor, ToolResult
from .definitions import TOOL_DEFINITIONS, get_tool_schema
__all__ = [
"ToolExecutor",
"ToolResult",
"TOOL_DEFINITIONS",
"get_tool_schema",
]

View File

@@ -0,0 +1,294 @@
# orchestrator/tools/definitions.py
"""
Definiciones de herramientas en formato estándar.
Formato compatible con OpenAI function calling y otros proveedores.
"""
from typing import Any
# Definiciones de herramientas disponibles
TOOL_DEFINITIONS = {
"bash": {
"name": "bash",
"description": "Ejecuta un comando bash en el sistema. Úsalo para ejecutar scripts, comandos del sistema, git, etc.",
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "El comando bash a ejecutar"
},
"working_dir": {
"type": "string",
"description": "Directorio de trabajo opcional"
}
},
"required": ["command"]
}
},
"read": {
"name": "read",
"description": "Lee el contenido de un archivo. Devuelve el texto del archivo.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Ruta al archivo a leer"
},
"encoding": {
"type": "string",
"description": "Codificación del archivo (default: utf-8)"
}
},
"required": ["path"]
}
},
"write": {
"name": "write",
"description": "Escribe contenido a un archivo. Crea el archivo si no existe, sobrescribe si existe.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Ruta al archivo a escribir"
},
"content": {
"type": "string",
"description": "Contenido a escribir en el archivo"
},
"append": {
"type": "boolean",
"description": "Si es true, añade al final en vez de sobrescribir"
}
},
"required": ["path", "content"]
}
},
"glob": {
"name": "glob",
"description": "Busca archivos usando patrones glob. Ej: '**/*.py' encuentra todos los Python.",
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Patrón glob para buscar archivos"
},
"base_dir": {
"type": "string",
"description": "Directorio base para la búsqueda"
}
},
"required": ["pattern"]
}
},
"grep": {
"name": "grep",
"description": "Busca texto en archivos usando expresiones regulares.",
"parameters": {
"type": "object",
"properties": {
"pattern": {
"type": "string",
"description": "Patrón regex a buscar"
},
"path": {
"type": "string",
"description": "Archivo o directorio donde buscar"
},
"recursive": {
"type": "boolean",
"description": "Buscar recursivamente en subdirectorios"
}
},
"required": ["pattern", "path"]
}
},
"http_request": {
"name": "http_request",
"description": "Hace una petición HTTP. Útil para APIs REST.",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "URL a la que hacer la petición"
},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
"description": "Método HTTP"
},
"headers": {
"type": "object",
"description": "Headers de la petición"
},
"body": {
"type": "string",
"description": "Cuerpo de la petición (para POST, PUT, PATCH)"
}
},
"required": ["url"]
}
},
"ssh": {
"name": "ssh",
"description": "Ejecuta un comando en un servidor remoto via SSH.",
"parameters": {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "Host o IP del servidor"
},
"command": {
"type": "string",
"description": "Comando a ejecutar en el servidor"
},
"user": {
"type": "string",
"description": "Usuario SSH (default: root)"
},
"key_path": {
"type": "string",
"description": "Ruta a la clave SSH"
}
},
"required": ["host", "command"]
}
},
"list_dir": {
"name": "list_dir",
"description": "Lista el contenido de un directorio.",
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Ruta al directorio"
},
"recursive": {
"type": "boolean",
"description": "Listar recursivamente"
},
"include_hidden": {
"type": "boolean",
"description": "Incluir archivos ocultos"
}
},
"required": ["path"]
}
},
}
def get_tool_schema(tool_names: list[str], format: str = "openai") -> list[dict]:
"""
Obtiene el schema de herramientas en el formato especificado.
Args:
tool_names: Lista de nombres de herramientas
format: Formato de salida (openai, anthropic, gemini)
Returns:
Lista de definiciones de herramientas
"""
tools = []
for name in tool_names:
if name not in TOOL_DEFINITIONS:
continue
tool_def = TOOL_DEFINITIONS[name]
if format == "openai":
tools.append({
"type": "function",
"function": {
"name": tool_def["name"],
"description": tool_def["description"],
"parameters": tool_def["parameters"]
}
})
elif format == "anthropic":
tools.append({
"name": tool_def["name"],
"description": tool_def["description"],
"input_schema": tool_def["parameters"]
})
elif format == "gemini":
tools.append({
"function_declarations": [{
"name": tool_def["name"],
"description": tool_def["description"],
"parameters": tool_def["parameters"]
}]
})
else:
# Formato genérico
tools.append(tool_def)
return tools
def get_tools_prompt(tool_names: list[str]) -> str:
"""
Genera un prompt describiendo las herramientas disponibles.
Para modelos que no soportan function calling nativo,
incluimos las herramientas en el prompt.
Args:
tool_names: Lista de nombres de herramientas
Returns:
String con descripción de herramientas para el prompt
"""
if not tool_names:
return ""
prompt = """
## Herramientas disponibles
Puedes usar las siguientes herramientas. Para usarlas, responde con un bloque JSON así:
```tool
{
"tool": "nombre_herramienta",
"params": {
"param1": "valor1",
...
}
}
```
Herramientas:
"""
for name in tool_names:
if name not in TOOL_DEFINITIONS:
continue
tool = TOOL_DEFINITIONS[name]
prompt += f"### {tool['name']}\n"
prompt += f"{tool['description']}\n"
prompt += f"Parámetros: {tool['parameters']['properties']}\n"
prompt += f"Requeridos: {tool['parameters'].get('required', [])}\n\n"
prompt += """
Después de usar una herramienta, recibirás el resultado y podrás continuar.
Puedes usar múltiples herramientas en secuencia para completar tareas complejas.
"""
return prompt

View File

@@ -0,0 +1,592 @@
# orchestrator/tools/executor.py
"""
Ejecutor de herramientas seguro.
Incluye:
- Validación de paths (sandbox)
- Sanitización de comandos
- Rate limiting
- Retry con backoff
- Logging de seguridad
"""
import asyncio
import subprocess
import re
import json
import time
import shlex
from pathlib import Path
from dataclasses import dataclass, field
from typing import Optional, Any, Callable
from datetime import datetime
from collections import deque
@dataclass
class ToolResult:
"""Resultado de la ejecución de una herramienta."""
tool: str
success: bool
output: str
error: Optional[str] = None
execution_time: float = 0.0
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
retries: int = 0
class RateLimiter:
"""Rate limiter simple basado en ventana deslizante."""
def __init__(self, max_calls: int, period: float = 60.0):
self.max_calls = max_calls
self.period = period
self.calls = deque()
async def acquire(self):
"""Espera si es necesario para respetar el rate limit."""
now = time.time()
# Limpiar llamadas antiguas
while self.calls and self.calls[0] < now - self.period:
self.calls.popleft()
# Si llegamos al límite, esperar
if len(self.calls) >= self.max_calls:
wait_time = self.calls[0] + self.period - now
if wait_time > 0:
await asyncio.sleep(wait_time)
self.calls.append(time.time())
class SecurityValidator:
"""Validador de seguridad para herramientas."""
# Comandos peligrosos que nunca deberían ejecutarse
DANGEROUS_COMMANDS = {
"rm -rf /", "rm -rf /*", ":(){ :|:& };:", # Fork bomb
"dd if=", "mkfs", "fdisk", "> /dev/sd",
"chmod -R 777 /", "chown -R",
}
# Patrones peligrosos
DANGEROUS_PATTERNS = [
r"rm\s+-rf\s+/(?!\w)", # rm -rf / (pero no /home)
r">\s*/dev/sd[a-z]", # Escribir a dispositivos
r"mkfs\.", # Formatear discos
r"dd\s+if=.+of=/dev", # dd a dispositivos
]
def __init__(self, working_dir: Path, sandbox: bool = True, allowed_commands: list = None):
self.working_dir = working_dir.resolve()
self.sandbox = sandbox
self.allowed_commands = allowed_commands or []
def validate_path(self, path: str) -> tuple[bool, str]:
"""
Valida que un path esté dentro del sandbox.
Returns:
(is_valid, resolved_path_or_error)
"""
if not self.sandbox:
return True, str(Path(path).expanduser())
try:
resolved = Path(path).expanduser()
if not resolved.is_absolute():
resolved = self.working_dir / resolved
resolved = resolved.resolve()
# Verificar que está dentro del working_dir
try:
resolved.relative_to(self.working_dir)
return True, str(resolved)
except ValueError:
return False, f"Path fuera del sandbox: {path}"
except Exception as e:
return False, f"Path inválido: {e}"
def validate_command(self, command: str) -> tuple[bool, str]:
"""
Valida que un comando sea seguro.
Returns:
(is_safe, error_if_unsafe)
"""
# Verificar comandos exactos peligrosos
for dangerous in self.DANGEROUS_COMMANDS:
if dangerous in command:
return False, f"Comando peligroso detectado: {dangerous}"
# Verificar patrones peligrosos
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, command):
return False, f"Patrón peligroso detectado: {pattern}"
# Si hay whitelist, verificar
if self.allowed_commands:
cmd_name = command.split()[0] if command.split() else ""
if cmd_name not in self.allowed_commands:
return False, f"Comando no permitido: {cmd_name}"
return True, ""
def sanitize_for_shell(self, value: str) -> str:
"""Escapa un valor para uso seguro en shell."""
return shlex.quote(value)
async def retry_with_backoff(
func: Callable,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
):
"""
Ejecuta una función con retry y backoff exponencial.
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return await func(), attempt
except Exception as e:
last_error = e
if attempt < max_retries:
delay = min(base_delay * (2 ** attempt), max_delay)
await asyncio.sleep(delay)
raise last_error
class ToolExecutor:
"""
Ejecuta herramientas de forma segura.
Incluye:
- Sandbox de paths
- Validación de comandos
- Rate limiting
- Retry automático
"""
def __init__(
self,
working_dir: Optional[str] = None,
ssh_key_path: str = "~/.ssh/id_rsa",
ssh_strict_host_checking: bool = True,
timeout: float = 60.0,
allowed_tools: Optional[list[str]] = None,
sandbox_paths: bool = True,
allowed_commands: Optional[list[str]] = None,
rate_limit_per_minute: int = 60,
max_retries: int = 3,
retry_delay: float = 1.0,
):
self.working_dir = Path(working_dir).resolve() if working_dir else Path.cwd()
self.ssh_key_path = Path(ssh_key_path).expanduser()
self.ssh_strict_host_checking = ssh_strict_host_checking
self.timeout = timeout
self.allowed_tools = allowed_tools
self.max_retries = max_retries
self.retry_delay = retry_delay
# Seguridad
self.validator = SecurityValidator(
self.working_dir,
sandbox=sandbox_paths,
allowed_commands=allowed_commands
)
# Rate limiting
self.rate_limiter = RateLimiter(rate_limit_per_minute)
# Registro de herramientas
self._tools = {
"bash": self._exec_bash,
"read": self._exec_read,
"write": self._exec_write,
"glob": self._exec_glob,
"grep": self._exec_grep,
"http_request": self._exec_http,
"ssh": self._exec_ssh,
"list_dir": self._exec_list_dir,
}
async def execute(self, tool: str, params: dict) -> ToolResult:
"""Ejecuta una herramienta con rate limiting y retry."""
start_time = time.time()
# Verificar si la herramienta está permitida
if self.allowed_tools and tool not in self.allowed_tools:
return ToolResult(
tool=tool, success=False, output="",
error=f"Herramienta '{tool}' no permitida"
)
if tool not in self._tools:
return ToolResult(
tool=tool, success=False, output="",
error=f"Herramienta '{tool}' no existe"
)
# Rate limiting
await self.rate_limiter.acquire()
# Ejecutar con retry
async def do_execute():
return await self._tools[tool](params)
try:
result, retries = await retry_with_backoff(
do_execute,
max_retries=self.max_retries,
base_delay=self.retry_delay
)
result.retries = retries
result.execution_time = time.time() - start_time
return result
except Exception as e:
return ToolResult(
tool=tool, success=False, output="",
error=str(e),
execution_time=time.time() - start_time
)
def parse_tool_calls(self, text: str) -> list[dict]:
"""
Parsea llamadas a herramientas desde el texto.
Soporta múltiples formatos:
- ```tool { ... } ```
- ```json { "tool": ... } ```
- <tool>...</tool>
"""
calls = []
# Formato ```tool ... ```
pattern1 = r'```tool\s*\n?(.*?)\n?```'
for match in re.findall(pattern1, text, re.DOTALL):
try:
call = json.loads(match.strip())
if "tool" in call:
calls.append(call)
except json.JSONDecodeError:
continue
# Formato ```json ... ``` con tool
pattern2 = r'```json\s*\n?(.*?)\n?```'
for match in re.findall(pattern2, text, re.DOTALL):
try:
call = json.loads(match.strip())
if "tool" in call:
calls.append(call)
except json.JSONDecodeError:
continue
# Formato <tool name="...">...</tool>
pattern3 = r'<tool\s+name=["\'](\w+)["\']>(.*?)</tool>'
for name, params_str in re.findall(pattern3, text, re.DOTALL):
try:
params = json.loads(params_str.strip())
calls.append({"tool": name, "params": params})
except json.JSONDecodeError:
continue
return calls
async def execute_from_text(self, text: str) -> list[ToolResult]:
"""Ejecuta todas las herramientas encontradas en el texto."""
calls = self.parse_tool_calls(text)
results = []
for call in calls:
tool = call.get("tool")
params = call.get("params", {})
result = await self.execute(tool, params)
results.append(result)
return results
# ==================== IMPLEMENTACIÓN DE HERRAMIENTAS ====================
async def _exec_bash(self, params: dict) -> ToolResult:
"""Ejecuta un comando bash de forma segura."""
command = params.get("command", "")
if not command:
return ToolResult(tool="bash", success=False, output="", error="Comando vacío")
# Validar comando
is_safe, error = self.validator.validate_command(command)
if not is_safe:
return ToolResult(tool="bash", success=False, output="", error=error)
try:
# Usar subprocess.run con shell=False cuando sea posible
# Para comandos simples, parsear y ejecutar sin shell
process = await asyncio.create_subprocess_exec(
"/bin/bash", "-c", command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.working_dir)
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout
)
return ToolResult(
tool="bash",
success=process.returncode == 0,
output=stdout.decode(),
error=stderr.decode() if process.returncode != 0 else None
)
except asyncio.TimeoutError:
return ToolResult(
tool="bash", success=False, output="",
error=f"Timeout después de {self.timeout}s"
)
async def _exec_read(self, params: dict) -> ToolResult:
"""Lee un archivo dentro del sandbox."""
path = params.get("path", "")
encoding = params.get("encoding", "utf-8")
if not path:
return ToolResult(tool="read", success=False, output="", error="Path vacío")
# Validar path
is_valid, result = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="read", success=False, output="", error=result)
try:
content = Path(result).read_text(encoding=encoding)
return ToolResult(tool="read", success=True, output=content)
except FileNotFoundError:
return ToolResult(tool="read", success=False, output="", error=f"Archivo no encontrado: {path}")
except Exception as e:
return ToolResult(tool="read", success=False, output="", error=str(e))
async def _exec_write(self, params: dict) -> ToolResult:
"""Escribe un archivo dentro del sandbox."""
path = params.get("path", "")
content = params.get("content", "")
append = params.get("append", False)
if not path:
return ToolResult(tool="write", success=False, output="", error="Path vacío")
# Validar path
is_valid, result = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="write", success=False, output="", error=result)
try:
file_path = Path(result)
file_path.parent.mkdir(parents=True, exist_ok=True)
mode = "a" if append else "w"
with open(file_path, mode) as f:
f.write(content)
return ToolResult(
tool="write", success=True,
output=f"Escrito {len(content)} bytes a {file_path.name}"
)
except Exception as e:
return ToolResult(tool="write", success=False, output="", error=str(e))
async def _exec_glob(self, params: dict) -> ToolResult:
"""Busca archivos con patrón glob dentro del sandbox."""
pattern = params.get("pattern", "")
if not pattern:
return ToolResult(tool="glob", success=False, output="", error="Patrón vacío")
try:
files = list(self.working_dir.glob(pattern))
# Filtrar solo archivos dentro del sandbox
safe_files = []
for f in files[:100]:
try:
f.relative_to(self.working_dir)
safe_files.append(str(f.relative_to(self.working_dir)))
except ValueError:
continue
output = "\n".join(safe_files)
return ToolResult(
tool="glob", success=True,
output=f"Encontrados {len(safe_files)} archivos:\n{output}"
)
except Exception as e:
return ToolResult(tool="glob", success=False, output="", error=str(e))
async def _exec_grep(self, params: dict) -> ToolResult:
"""Busca texto en archivos."""
pattern = params.get("pattern", "")
path = params.get("path", "")
recursive = params.get("recursive", False)
if not pattern or not path:
return ToolResult(tool="grep", success=False, output="", error="Pattern o path vacío")
# Validar path
is_valid, validated_path = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="grep", success=False, output="", error=validated_path)
try:
cmd = ["grep", "-n", "--color=never"]
if recursive:
cmd.append("-r")
cmd.extend([pattern, validated_path])
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(self.working_dir)
)
stdout, _ = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout
)
return ToolResult(tool="grep", success=True, output=stdout.decode())
except Exception as e:
return ToolResult(tool="grep", success=False, output="", error=str(e))
async def _exec_http(self, params: dict) -> ToolResult:
"""Hace una petición HTTP."""
url = params.get("url", "")
method = params.get("method", "GET").upper()
headers = params.get("headers", {})
body = params.get("body", "")
if not url:
return ToolResult(tool="http_request", success=False, output="", error="URL vacía")
try:
import urllib.request
import urllib.error
data = body.encode() if body else None
req = urllib.request.Request(url, data=data, method=method)
for key, value in headers.items():
req.add_header(key, value)
with urllib.request.urlopen(req, timeout=self.timeout) as response:
content = response.read().decode()
return ToolResult(tool="http_request", success=True, output=content)
except urllib.error.HTTPError as e:
return ToolResult(
tool="http_request", success=False,
output=e.read().decode() if e.fp else "",
error=f"HTTP {e.code}: {e.reason}"
)
except Exception as e:
return ToolResult(tool="http_request", success=False, output="", error=str(e))
async def _exec_ssh(self, params: dict) -> ToolResult:
"""Ejecuta comando via SSH con verificación de host."""
host = params.get("host", "")
command = params.get("command", "")
user = params.get("user", "root")
key_path = params.get("key_path", str(self.ssh_key_path))
if not host or not command:
return ToolResult(tool="ssh", success=False, output="", error="Host o command vacío")
# Validar comando remoto también
is_safe, error = self.validator.validate_command(command)
if not is_safe:
return ToolResult(tool="ssh", success=False, output="", error=f"Comando remoto inseguro: {error}")
try:
ssh_cmd = [
"ssh",
"-o", f"StrictHostKeyChecking={'yes' if self.ssh_strict_host_checking else 'no'}",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=10",
"-i", key_path,
f"{user}@{host}",
command
]
process = await asyncio.create_subprocess_exec(
*ssh_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=self.timeout
)
return ToolResult(
tool="ssh",
success=process.returncode == 0,
output=stdout.decode(),
error=stderr.decode() if process.returncode != 0 else None
)
except asyncio.TimeoutError:
return ToolResult(
tool="ssh", success=False, output="",
error=f"SSH timeout después de {self.timeout}s"
)
except Exception as e:
return ToolResult(tool="ssh", success=False, output="", error=str(e))
async def _exec_list_dir(self, params: dict) -> ToolResult:
"""Lista contenido de directorio dentro del sandbox."""
path = params.get("path", ".")
recursive = params.get("recursive", False)
include_hidden = params.get("include_hidden", False)
# Validar path
is_valid, validated_path = self.validator.validate_path(path)
if not is_valid:
return ToolResult(tool="list_dir", success=False, output="", error=validated_path)
try:
dir_path = Path(validated_path)
if not dir_path.exists():
return ToolResult(
tool="list_dir", success=False, output="",
error=f"Directorio no existe: {path}"
)
entries = []
pattern = "**/*" if recursive else "*"
for entry in dir_path.glob(pattern):
if not include_hidden and entry.name.startswith("."):
continue
# Verificar que está en sandbox
try:
entry.relative_to(self.working_dir)
except ValueError:
continue
entry_type = "d" if entry.is_dir() else "f"
rel_path = entry.relative_to(dir_path)
entries.append(f"[{entry_type}] {rel_path}")
return ToolResult(
tool="list_dir", success=True,
output="\n".join(sorted(entries)[:200])
)
except Exception as e:
return ToolResult(tool="list_dir", success=False, output="", error=str(e))

0
outputs/.gitkeep Normal file
View File