""" ============================================================================= THE FACTORY - Executor ============================================================================= Genera artefactos usando diferentes modelos de IA. Soporta: texto, código, imágenes, documentos. ============================================================================= """ import os import logging import base64 from typing import Dict, Any, Optional import anthropic import openai import httpx from config import FactoryConfig, FunctionType, ModelConfig logger = logging.getLogger("factory.executor") class Executor: """ El Executor genera artefactos usando modelos de IA. """ def __init__(self, config: FactoryConfig): self.config = config # Inicializar clientes if config.anthropic_api_key: self.anthropic = anthropic.Anthropic(api_key=config.anthropic_api_key) else: self.anthropic = None logger.warning("ANTHROPIC_API_KEY no configurada") if config.openai_api_key: self.openai = openai.OpenAI(api_key=config.openai_api_key) else: self.openai = None logger.warning("OPENAI_API_KEY no configurada") self.replicate_key = config.replicate_api_key def generate( self, function: FunctionType, context: Dict[str, Any], budget_remaining: float ) -> Dict[str, Any]: """ Genera un artefacto. Args: function: Tipo de función context: Contexto preparado por el Director budget_remaining: Presupuesto disponible Returns: { "artifact": , "cost_usd": , "model_used": , "tokens_used": } """ model_name = context.get("model", self.config.default_models[function]) model_config = self.config.get_model(model_name) if not model_config: raise ValueError(f"Modelo desconocido: {model_name}") prompt = context.get("prompt", "") logger.info(f"Generando con {model_name} ({model_config.provider})") if function == FunctionType.IMAGE_GENERATION: return self._generate_image(prompt, model_config, budget_remaining) else: return self._generate_text(prompt, model_config, function, budget_remaining) def _generate_text( self, prompt: str, model: ModelConfig, function: FunctionType, budget_remaining: float ) -> Dict[str, Any]: """Genera texto usando Claude o GPT.""" if model.provider == "anthropic": return self._generate_anthropic(prompt, model, function) elif model.provider == "openai": return self._generate_openai(prompt, model, function) else: raise ValueError(f"Provider no soportado: {model.provider}") def _generate_anthropic( self, prompt: str, model: ModelConfig, function: FunctionType ) -> Dict[str, Any]: """Genera con Anthropic Claude.""" if not self.anthropic: raise RuntimeError("Cliente Anthropic no inicializado") # System prompt según función system = self._get_system_prompt(function) try: response = self.anthropic.messages.create( model=model.name, max_tokens=model.max_tokens, system=system, messages=[{"role": "user", "content": prompt}] ) # Extraer texto artifact = "" for block in response.content: if hasattr(block, "text"): artifact += block.text # Calcular coste input_tokens = response.usage.input_tokens output_tokens = response.usage.output_tokens cost = ( (input_tokens / 1000) * model.cost_per_1k_input + (output_tokens / 1000) * model.cost_per_1k_output ) return { "artifact": artifact, "cost_usd": cost, "model_used": model.name, "tokens_used": input_tokens + output_tokens, "input_tokens": input_tokens, "output_tokens": output_tokens } except Exception as e: logger.error(f"Error Anthropic: {e}") raise def _generate_openai( self, prompt: str, model: ModelConfig, function: FunctionType ) -> Dict[str, Any]: """Genera con OpenAI GPT.""" if not self.openai: raise RuntimeError("Cliente OpenAI no inicializado") system = self._get_system_prompt(function) try: response = self.openai.chat.completions.create( model=model.name, max_tokens=model.max_tokens, messages=[ {"role": "system", "content": system}, {"role": "user", "content": prompt} ] ) artifact = response.choices[0].message.content # Calcular coste input_tokens = response.usage.prompt_tokens output_tokens = response.usage.completion_tokens cost = ( (input_tokens / 1000) * model.cost_per_1k_input + (output_tokens / 1000) * model.cost_per_1k_output ) return { "artifact": artifact, "cost_usd": cost, "model_used": model.name, "tokens_used": input_tokens + output_tokens, "input_tokens": input_tokens, "output_tokens": output_tokens } except Exception as e: logger.error(f"Error OpenAI: {e}") raise def _generate_image( self, prompt: str, model: ModelConfig, budget_remaining: float ) -> Dict[str, Any]: """Genera imagen con Replicate (Flux).""" if not self.replicate_key: raise RuntimeError("REPLICATE_API_KEY no configurada") try: # Llamar a Replicate API response = httpx.post( "https://api.replicate.com/v1/predictions", headers={ "Authorization": f"Token {self.replicate_key}", "Content-Type": "application/json" }, json={ "version": self._get_replicate_version(model.name), "input": { "prompt": prompt, "aspect_ratio": "1:1", "output_format": "webp", "output_quality": 90 } }, timeout=60.0 ) response.raise_for_status() prediction = response.json() # Esperar resultado prediction_id = prediction["id"] result = self._wait_for_replicate(prediction_id) return { "artifact": { "url": result["output"][0] if isinstance(result["output"], list) else result["output"], "prompt": prompt }, "cost_usd": model.cost_per_1k_input, # Coste fijo por imagen "model_used": model.name, "prediction_id": prediction_id } except Exception as e: logger.error(f"Error Replicate: {e}") raise def _wait_for_replicate(self, prediction_id: str, max_wait: int = 120) -> Dict: """Espera resultado de Replicate.""" import time for _ in range(max_wait): response = httpx.get( f"https://api.replicate.com/v1/predictions/{prediction_id}", headers={"Authorization": f"Token {self.replicate_key}"}, timeout=10.0 ) result = response.json() if result["status"] == "succeeded": return result elif result["status"] == "failed": raise RuntimeError(f"Replicate failed: {result.get('error')}") time.sleep(1) raise TimeoutError("Replicate prediction timeout") def _get_replicate_version(self, model_name: str) -> str: """Obtiene version ID de Replicate.""" versions = { "black-forest-labs/flux-1.1-pro": "80a09d66baa990429c004a8ff540ce96c1e9e0e9c381", "black-forest-labs/flux-schnell": "f2ab8a5bfe79f02f0789a146cf5e73d2a4ff2684a98c2b" } return versions.get(model_name, versions["black-forest-labs/flux-schnell"]) def _get_system_prompt(self, function: FunctionType) -> str: """Obtiene system prompt según función.""" prompts = { FunctionType.TEXT_GENERATION: """Eres un generador de contenido experto. Produces textos de alta calidad, bien estructurados y profesionales. Sigues las instrucciones del usuario con precisión.""", FunctionType.CODE_GENERATION: """Eres un programador experto. Generas código limpio, eficiente y bien documentado. Sigues mejores prácticas y patrones de diseño apropiados. Incluyes manejo de errores y comentarios útiles.""", FunctionType.DOCUMENT_GENERATION: """Eres un experto en documentación profesional. Creas documentos claros, completos y bien formateados. Aseguras que toda la información necesaria esté presente. Usas un tono profesional y apropiado al contexto.""", FunctionType.AUDIO_GENERATION: """Eres un experto en producción de audio. Generas scripts y descripciones para contenido de audio.""", FunctionType.VIDEO_GENERATION: """Eres un experto en producción de video. Generas guiones y descripciones para contenido de video.""" } return prompts.get(function, prompts[FunctionType.TEXT_GENERATION])