commit 1cad39bc9ef7a6d294c686cc71ca18c2cf1d76f4 Author: ARCHITECT Date: Tue Jan 6 08:28:16 2026 +0000 Initial commit: THE FACTORY - Iterative Image Generation Tasks: - image_generate: Generate image from prompt - image_variant: Generate variant of existing image - image_upscale: Increase resolution Models: SDXL, Flux, SDXL-Turbo RunPod Serverless Handler diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b1fabd3 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,19 @@ +# THE FACTORY - Image Generation for RunPod Serverless +FROM runpod/pytorch:2.1.0-py3.10-cuda11.8.0-devel-ubuntu22.04 + +WORKDIR /app + +# Copy requirements and install Python packages +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy all Python files +COPY *.py ./ + +# Set environment variables +ENV PYTHONUNBUFFERED=1 +ENV HF_HOME=/runpod-volume/huggingface +ENV TRANSFORMERS_CACHE=/runpod-volume/huggingface + +# Run the handler +CMD ["python", "-u", "/app/handler.py"] diff --git a/config.py b/config.py new file mode 100644 index 0000000..e52d217 --- /dev/null +++ b/config.py @@ -0,0 +1,138 @@ +""" +============================================================================= +THE FACTORY - Configuración +============================================================================= +""" + +import os +from enum import Enum +from dataclasses import dataclass, field +from typing import Dict, List, Optional + + +class JobState(Enum): + """Estados posibles de un job.""" + PENDING = "PENDING" + QUEUED = "QUEUED" + RUNNING = "RUNNING" + EVALUATING = "EVALUATING" + CONVERGED = "CONVERGED" + EXHAUSTED = "EXHAUSTED" + FAILED = "FAILED" + CANCELLED = "CANCELLED" + + +class FunctionType(Enum): + """Tipos de función soportados.""" + TEXT_GENERATION = "TEXT_GENERATION" + IMAGE_GENERATION = "IMAGE_GENERATION" + CODE_GENERATION = "CODE_GENERATION" + DOCUMENT_GENERATION = "DOCUMENT_GENERATION" + AUDIO_GENERATION = "AUDIO_GENERATION" + VIDEO_GENERATION = "VIDEO_GENERATION" + + +@dataclass +class ModelConfig: + """Configuración de un modelo.""" + name: str + provider: str + cost_per_1k_input: float + cost_per_1k_output: float + max_tokens: int = 4096 + supports_images: bool = False + supports_streaming: bool = True + + +@dataclass +class FactoryConfig: + """Configuración global de THE FACTORY.""" + + # API Keys + anthropic_api_key: str = field(default_factory=lambda: os.environ.get("ANTHROPIC_API_KEY", "")) + openai_api_key: str = field(default_factory=lambda: os.environ.get("OPENAI_API_KEY", "")) + replicate_api_key: str = field(default_factory=lambda: os.environ.get("REPLICATE_API_KEY", "")) + + # Límites por defecto + default_max_cycles: int = 5 + default_budget_usd: float = 1.0 + default_timeout_ms: int = 120000 + + # Convergencia + convergence_threshold: float = 0.85 + diminishing_returns_threshold: float = 0.02 + diminishing_returns_min_confidence: float = 0.70 + + # Modelos por función + models: Dict[str, ModelConfig] = field(default_factory=lambda: { + # Text generation + "claude-sonnet": ModelConfig( + name="claude-sonnet-4-20250514", + provider="anthropic", + cost_per_1k_input=0.003, + cost_per_1k_output=0.015, + max_tokens=8192, + supports_images=True + ), + "claude-haiku": ModelConfig( + name="claude-haiku-4-20250514", + provider="anthropic", + cost_per_1k_input=0.00025, + cost_per_1k_output=0.00125, + max_tokens=8192, + supports_images=True + ), + "gpt-4o": ModelConfig( + name="gpt-4o", + provider="openai", + cost_per_1k_input=0.005, + cost_per_1k_output=0.015, + max_tokens=4096, + supports_images=True + ), + "gpt-4o-mini": ModelConfig( + name="gpt-4o-mini", + provider="openai", + cost_per_1k_input=0.00015, + cost_per_1k_output=0.0006, + max_tokens=4096, + supports_images=True + ), + # Image generation + "flux-pro": ModelConfig( + name="black-forest-labs/flux-1.1-pro", + provider="replicate", + cost_per_1k_input=0.05, # por imagen + cost_per_1k_output=0.0, + max_tokens=0 + ), + "flux-schnell": ModelConfig( + name="black-forest-labs/flux-schnell", + provider="replicate", + cost_per_1k_input=0.003, # por imagen + cost_per_1k_output=0.0, + max_tokens=0 + ), + }) + + # Modelo por defecto por función + default_models: Dict[FunctionType, str] = field(default_factory=lambda: { + FunctionType.TEXT_GENERATION: "claude-sonnet", + FunctionType.CODE_GENERATION: "claude-sonnet", + FunctionType.DOCUMENT_GENERATION: "claude-sonnet", + FunctionType.IMAGE_GENERATION: "flux-schnell", + FunctionType.AUDIO_GENERATION: "claude-sonnet", # placeholder + FunctionType.VIDEO_GENERATION: "claude-sonnet", # placeholder + }) + + # Modelo para evaluación + evaluator_model: str = "claude-haiku" + + def get_model(self, name: str) -> Optional[ModelConfig]: + """Obtiene configuración de modelo.""" + return self.models.get(name) + + def get_default_model(self, function: FunctionType) -> ModelConfig: + """Obtiene modelo por defecto para una función.""" + model_name = self.default_models.get(function, "claude-sonnet") + return self.models[model_name] diff --git a/director.py b/director.py new file mode 100644 index 0000000..faa0410 --- /dev/null +++ b/director.py @@ -0,0 +1,254 @@ +""" +============================================================================= +THE FACTORY - Director +============================================================================= +Coordina el proceso de generación iterativa. +- Prepara contexto para el Executor +- Decide cuándo converger +- Gestiona presupuesto +============================================================================= +""" + +import logging +from typing import Dict, Any, Optional + +from config import FactoryConfig, FunctionType + +logger = logging.getLogger("factory.director") + + +class Director: + """ + El Director coordina el proceso de generación iterativa. + """ + + def __init__(self, config: FactoryConfig): + self.config = config + + def prepare_context( + self, + seed: str, + objective: str, + function: FunctionType, + previous_artifact: Optional[Any], + feedback: Optional[str], + iteration: int, + context: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Prepara el contexto para el Executor. + + Args: + seed: Prompt inicial + objective: Objetivo a alcanzar + function: Tipo de función + previous_artifact: Artefacto de iteración anterior + feedback: Feedback del Evaluator + iteration: Número de iteración actual + context: Contexto adicional del usuario + + Returns: + Contexto preparado para el Executor + """ + exec_context = { + "seed": seed, + "objective": objective, + "function": function, + "iteration": iteration, + "is_first_iteration": iteration == 1, + "user_context": context + } + + if iteration == 1: + # Primera iteración: usar seed directamente + exec_context["prompt"] = self._build_initial_prompt(seed, objective, function) + else: + # Iteraciones posteriores: incorporar feedback + exec_context["previous_artifact"] = previous_artifact + exec_context["feedback"] = feedback + exec_context["prompt"] = self._build_refinement_prompt( + seed, objective, function, previous_artifact, feedback, iteration + ) + + # Seleccionar modelo apropiado + exec_context["model"] = self._select_model(function, iteration, context) + + return exec_context + + def _build_initial_prompt( + self, + seed: str, + objective: str, + function: FunctionType + ) -> str: + """Construye prompt para primera iteración.""" + + if function == FunctionType.TEXT_GENERATION: + return f"""Genera contenido de alta calidad basado en lo siguiente: + +SOLICITUD: +{seed} + +OBJETIVO: +{objective} + +Proporciona una respuesta completa, bien estructurada y profesional.""" + + elif function == FunctionType.CODE_GENERATION: + return f"""Genera código de alta calidad basado en lo siguiente: + +SOLICITUD: +{seed} + +OBJETIVO: +{objective} + +El código debe: +- Ser limpio y bien documentado +- Seguir mejores prácticas +- Incluir manejo de errores apropiado +- Ser eficiente y mantenible""" + + elif function == FunctionType.IMAGE_GENERATION: + return f"""{seed} + +Style: Professional, high quality, detailed +Objective: {objective}""" + + elif function == FunctionType.DOCUMENT_GENERATION: + return f"""Genera un documento profesional: + +TIPO DE DOCUMENTO: +{seed} + +OBJETIVO: +{objective} + +El documento debe ser: +- Profesional y bien formateado +- Completo con toda la información necesaria +- Claro y fácil de leer""" + + else: + return f"{seed}\n\nObjective: {objective}" + + def _build_refinement_prompt( + self, + seed: str, + objective: str, + function: FunctionType, + previous_artifact: Any, + feedback: str, + iteration: int + ) -> str: + """Construye prompt para iteraciones de refinamiento.""" + + # Truncar artefacto anterior si es muy largo + prev_str = str(previous_artifact) + if len(prev_str) > 2000: + prev_str = prev_str[:2000] + "\n[...truncado...]" + + if function in [FunctionType.TEXT_GENERATION, FunctionType.DOCUMENT_GENERATION]: + return f"""Mejora el siguiente contenido basándote en el feedback: + +SOLICITUD ORIGINAL: +{seed} + +OBJETIVO: +{objective} + +CONTENIDO ANTERIOR (iteración {iteration - 1}): +{prev_str} + +FEEDBACK A INCORPORAR: +{feedback} + +Genera una versión mejorada que aborde el feedback mientras mantiene los aspectos positivos.""" + + elif function == FunctionType.CODE_GENERATION: + return f"""Mejora el siguiente código basándote en el feedback: + +SOLICITUD ORIGINAL: +{seed} + +OBJETIVO: +{objective} + +CÓDIGO ANTERIOR (iteración {iteration - 1}): +``` +{prev_str} +``` + +FEEDBACK A INCORPORAR: +{feedback} + +Genera una versión mejorada del código.""" + + elif function == FunctionType.IMAGE_GENERATION: + return f"""{seed} + +Previous attempt feedback: {feedback} +Iteration: {iteration} +Objective: {objective} +Improve based on feedback while maintaining the core concept.""" + + else: + return f"""Mejora basándote en el feedback: + +Original: {seed} +Objetivo: {objective} +Anterior: {prev_str} +Feedback: {feedback} + +Genera versión mejorada.""" + + def _select_model( + self, + function: FunctionType, + iteration: int, + context: Dict[str, Any] + ) -> str: + """Selecciona el modelo apropiado.""" + + # Si el usuario especificó un modelo, usarlo + if "model" in context: + return context["model"] + + # Por defecto, usar el modelo configurado para la función + return self.config.default_models.get(function, "claude-sonnet") + + def should_converge( + self, + confidence: float, + iteration: int, + previous_confidence: float, + budget_used: float, + budget_total: float + ) -> bool: + """ + Decide si el job debe converger. + + Criterios: + 1. Umbral de confianza alcanzado + 2. Rendimientos decrecientes (mejora < threshold) + 3. Presupuesto casi agotado + """ + + # 1. Umbral de confianza + if confidence >= self.config.convergence_threshold: + logger.info(f"Convergencia por umbral: {confidence:.2f} >= {self.config.convergence_threshold}") + return True + + # 2. Rendimientos decrecientes + if iteration > 1 and confidence >= self.config.diminishing_returns_min_confidence: + improvement = confidence - previous_confidence + if improvement < self.config.diminishing_returns_threshold: + logger.info(f"Convergencia por rendimientos decrecientes: mejora {improvement:.3f} < {self.config.diminishing_returns_threshold}") + return True + + # 3. Presupuesto casi agotado (>90% usado) + if budget_used >= budget_total * 0.9: + logger.info(f"Convergencia por presupuesto: ${budget_used:.4f} >= 90% de ${budget_total:.4f}") + return True + + return False diff --git a/evaluator.py b/evaluator.py new file mode 100644 index 0000000..6af4b81 --- /dev/null +++ b/evaluator.py @@ -0,0 +1,233 @@ +""" +============================================================================= +THE FACTORY - Evaluator +============================================================================= +Evalúa artefactos generados vs el objetivo. +Proporciona: +- confidence: 0.0 a 1.0 +- feedback: sugerencias de mejora +============================================================================= +""" + +import json +import logging +from typing import Dict, Any, Optional + +import anthropic + +from config import FactoryConfig, FunctionType + +logger = logging.getLogger("factory.evaluator") + + +class Evaluator: + """ + El Evaluator evalúa artefactos contra el objetivo. + """ + + def __init__(self, config: FactoryConfig): + self.config = config + + if config.anthropic_api_key: + self.anthropic = anthropic.Anthropic(api_key=config.anthropic_api_key) + else: + self.anthropic = None + logger.warning("ANTHROPIC_API_KEY no configurada para Evaluator") + + def evaluate( + self, + artifact: Any, + objective: str, + function: FunctionType + ) -> Dict[str, Any]: + """ + Evalúa un artefacto. + + Args: + artifact: El artefacto generado + objective: El objetivo a cumplir + function: Tipo de función + + Returns: + { + "confidence": 0.0-1.0, + "feedback": "sugerencias de mejora", + "strengths": ["punto fuerte 1", ...], + "weaknesses": ["punto débil 1", ...], + "cost_usd": coste de evaluación + } + """ + + if not self.anthropic: + # Fallback: evaluación básica sin LLM + return self._evaluate_basic(artifact, objective, function) + + return self._evaluate_with_llm(artifact, objective, function) + + def _evaluate_with_llm( + self, + artifact: Any, + objective: str, + function: FunctionType + ) -> Dict[str, Any]: + """Evalúa usando Claude Haiku.""" + + model = self.config.get_model(self.config.evaluator_model) + + # Preparar artefacto para evaluación + artifact_str = self._prepare_artifact_for_eval(artifact, function) + + prompt = f"""Evalúa el siguiente artefacto contra el objetivo especificado. + +OBJETIVO: +{objective} + +ARTEFACTO A EVALUAR: +{artifact_str} + +Proporciona tu evaluación en formato JSON con esta estructura exacta: +{{ + "confidence": , + "feedback": "", + "strengths": ["", ""], + "weaknesses": ["", ""], + "meets_objective": +}} + +Criterios de evaluación: +- 0.9-1.0: Excelente, cumple completamente el objetivo +- 0.7-0.9: Bueno, cumple mayormente con mejoras menores posibles +- 0.5-0.7: Aceptable, cumple parcialmente +- 0.3-0.5: Deficiente, necesita mejoras significativas +- 0.0-0.3: Inaceptable, no cumple el objetivo + +Sé específico en el feedback para que el siguiente intento pueda mejorar.""" + + try: + response = self.anthropic.messages.create( + model=model.name, + max_tokens=1024, + messages=[{"role": "user", "content": prompt}] + ) + + # Extraer texto + response_text = "" + for block in response.content: + if hasattr(block, "text"): + response_text += block.text + + # Parsear JSON + eval_result = self._parse_eval_response(response_text) + + # Calcular coste + input_tokens = response.usage.input_tokens + output_tokens = response.usage.output_tokens + cost = ( + (input_tokens / 1000) * model.cost_per_1k_input + + (output_tokens / 1000) * model.cost_per_1k_output + ) + + eval_result["cost_usd"] = cost + eval_result["tokens_used"] = input_tokens + output_tokens + + logger.info(f"Evaluación: confidence={eval_result['confidence']:.2f}") + + return eval_result + + except Exception as e: + logger.error(f"Error en evaluación LLM: {e}") + return self._evaluate_basic(artifact, objective, function) + + def _parse_eval_response(self, response: str) -> Dict[str, Any]: + """Parsea respuesta de evaluación.""" + + # Intentar extraer JSON + try: + # Buscar JSON en la respuesta + start = response.find("{") + end = response.rfind("}") + 1 + + if start >= 0 and end > start: + json_str = response[start:end] + result = json.loads(json_str) + + # Validar campos requeridos + confidence = float(result.get("confidence", 0.5)) + confidence = max(0.0, min(1.0, confidence)) # Clamp 0-1 + + return { + "confidence": confidence, + "feedback": result.get("feedback", "Sin feedback específico"), + "strengths": result.get("strengths", []), + "weaknesses": result.get("weaknesses", []), + "meets_objective": result.get("meets_objective", confidence >= 0.7) + } + + except (json.JSONDecodeError, ValueError) as e: + logger.warning(f"Error parseando evaluación: {e}") + + # Fallback + return { + "confidence": 0.5, + "feedback": "No se pudo evaluar correctamente. Intenta de nuevo.", + "strengths": [], + "weaknesses": ["Evaluación fallida"], + "meets_objective": False + } + + def _evaluate_basic( + self, + artifact: Any, + objective: str, + function: FunctionType + ) -> Dict[str, Any]: + """Evaluación básica sin LLM.""" + + artifact_str = str(artifact) if artifact else "" + + # Heurísticas simples + confidence = 0.5 + + # Verificar longitud mínima + if len(artifact_str) < 50: + confidence -= 0.2 + elif len(artifact_str) > 200: + confidence += 0.1 + + # Verificar que contiene palabras del objetivo + objective_words = set(objective.lower().split()) + artifact_words = set(artifact_str.lower().split()) + overlap = len(objective_words & artifact_words) + + if overlap >= len(objective_words) * 0.3: + confidence += 0.2 + + confidence = max(0.0, min(1.0, confidence)) + + return { + "confidence": confidence, + "feedback": "Evaluación básica aplicada. Considera revisar manualmente.", + "strengths": ["Artefacto generado"], + "weaknesses": ["Evaluación automática limitada"], + "meets_objective": confidence >= 0.7, + "cost_usd": 0.0 + } + + def _prepare_artifact_for_eval(self, artifact: Any, function: FunctionType) -> str: + """Prepara artefacto para evaluación.""" + + if artifact is None: + return "[No artifact generated]" + + if function == FunctionType.IMAGE_GENERATION: + if isinstance(artifact, dict): + return f"[Image generated]\nURL: {artifact.get('url', 'N/A')}\nPrompt used: {artifact.get('prompt', 'N/A')}" + return "[Image generated]" + + artifact_str = str(artifact) + + # Truncar si es muy largo + if len(artifact_str) > 4000: + return artifact_str[:4000] + "\n[...truncado para evaluación...]" + + return artifact_str diff --git a/executor.py b/executor.py new file mode 100644 index 0000000..2052b34 --- /dev/null +++ b/executor.py @@ -0,0 +1,300 @@ +""" +============================================================================= +THE FACTORY - Executor +============================================================================= +Genera artefactos usando diferentes modelos de IA. +Soporta: texto, código, imágenes, documentos. +============================================================================= +""" + +import os +import logging +import base64 +from typing import Dict, Any, Optional + +import anthropic +import openai +import httpx + +from config import FactoryConfig, FunctionType, ModelConfig + +logger = logging.getLogger("factory.executor") + + +class Executor: + """ + El Executor genera artefactos usando modelos de IA. + """ + + def __init__(self, config: FactoryConfig): + self.config = config + + # Inicializar clientes + if config.anthropic_api_key: + self.anthropic = anthropic.Anthropic(api_key=config.anthropic_api_key) + else: + self.anthropic = None + logger.warning("ANTHROPIC_API_KEY no configurada") + + if config.openai_api_key: + self.openai = openai.OpenAI(api_key=config.openai_api_key) + else: + self.openai = None + logger.warning("OPENAI_API_KEY no configurada") + + self.replicate_key = config.replicate_api_key + + def generate( + self, + function: FunctionType, + context: Dict[str, Any], + budget_remaining: float + ) -> Dict[str, Any]: + """ + Genera un artefacto. + + Args: + function: Tipo de función + context: Contexto preparado por el Director + budget_remaining: Presupuesto disponible + + Returns: + { + "artifact": , + "cost_usd": , + "model_used": , + "tokens_used": + } + """ + model_name = context.get("model", self.config.default_models[function]) + model_config = self.config.get_model(model_name) + + if not model_config: + raise ValueError(f"Modelo desconocido: {model_name}") + + prompt = context.get("prompt", "") + + logger.info(f"Generando con {model_name} ({model_config.provider})") + + if function == FunctionType.IMAGE_GENERATION: + return self._generate_image(prompt, model_config, budget_remaining) + else: + return self._generate_text(prompt, model_config, function, budget_remaining) + + def _generate_text( + self, + prompt: str, + model: ModelConfig, + function: FunctionType, + budget_remaining: float + ) -> Dict[str, Any]: + """Genera texto usando Claude o GPT.""" + + if model.provider == "anthropic": + return self._generate_anthropic(prompt, model, function) + elif model.provider == "openai": + return self._generate_openai(prompt, model, function) + else: + raise ValueError(f"Provider no soportado: {model.provider}") + + def _generate_anthropic( + self, + prompt: str, + model: ModelConfig, + function: FunctionType + ) -> Dict[str, Any]: + """Genera con Anthropic Claude.""" + + if not self.anthropic: + raise RuntimeError("Cliente Anthropic no inicializado") + + # System prompt según función + system = self._get_system_prompt(function) + + try: + response = self.anthropic.messages.create( + model=model.name, + max_tokens=model.max_tokens, + system=system, + messages=[{"role": "user", "content": prompt}] + ) + + # Extraer texto + artifact = "" + for block in response.content: + if hasattr(block, "text"): + artifact += block.text + + # Calcular coste + input_tokens = response.usage.input_tokens + output_tokens = response.usage.output_tokens + cost = ( + (input_tokens / 1000) * model.cost_per_1k_input + + (output_tokens / 1000) * model.cost_per_1k_output + ) + + return { + "artifact": artifact, + "cost_usd": cost, + "model_used": model.name, + "tokens_used": input_tokens + output_tokens, + "input_tokens": input_tokens, + "output_tokens": output_tokens + } + + except Exception as e: + logger.error(f"Error Anthropic: {e}") + raise + + def _generate_openai( + self, + prompt: str, + model: ModelConfig, + function: FunctionType + ) -> Dict[str, Any]: + """Genera con OpenAI GPT.""" + + if not self.openai: + raise RuntimeError("Cliente OpenAI no inicializado") + + system = self._get_system_prompt(function) + + try: + response = self.openai.chat.completions.create( + model=model.name, + max_tokens=model.max_tokens, + messages=[ + {"role": "system", "content": system}, + {"role": "user", "content": prompt} + ] + ) + + artifact = response.choices[0].message.content + + # Calcular coste + input_tokens = response.usage.prompt_tokens + output_tokens = response.usage.completion_tokens + cost = ( + (input_tokens / 1000) * model.cost_per_1k_input + + (output_tokens / 1000) * model.cost_per_1k_output + ) + + return { + "artifact": artifact, + "cost_usd": cost, + "model_used": model.name, + "tokens_used": input_tokens + output_tokens, + "input_tokens": input_tokens, + "output_tokens": output_tokens + } + + except Exception as e: + logger.error(f"Error OpenAI: {e}") + raise + + def _generate_image( + self, + prompt: str, + model: ModelConfig, + budget_remaining: float + ) -> Dict[str, Any]: + """Genera imagen con Replicate (Flux).""" + + if not self.replicate_key: + raise RuntimeError("REPLICATE_API_KEY no configurada") + + try: + # Llamar a Replicate API + response = httpx.post( + "https://api.replicate.com/v1/predictions", + headers={ + "Authorization": f"Token {self.replicate_key}", + "Content-Type": "application/json" + }, + json={ + "version": self._get_replicate_version(model.name), + "input": { + "prompt": prompt, + "aspect_ratio": "1:1", + "output_format": "webp", + "output_quality": 90 + } + }, + timeout=60.0 + ) + response.raise_for_status() + prediction = response.json() + + # Esperar resultado + prediction_id = prediction["id"] + result = self._wait_for_replicate(prediction_id) + + return { + "artifact": { + "url": result["output"][0] if isinstance(result["output"], list) else result["output"], + "prompt": prompt + }, + "cost_usd": model.cost_per_1k_input, # Coste fijo por imagen + "model_used": model.name, + "prediction_id": prediction_id + } + + except Exception as e: + logger.error(f"Error Replicate: {e}") + raise + + def _wait_for_replicate(self, prediction_id: str, max_wait: int = 120) -> Dict: + """Espera resultado de Replicate.""" + import time + + for _ in range(max_wait): + response = httpx.get( + f"https://api.replicate.com/v1/predictions/{prediction_id}", + headers={"Authorization": f"Token {self.replicate_key}"}, + timeout=10.0 + ) + result = response.json() + + if result["status"] == "succeeded": + return result + elif result["status"] == "failed": + raise RuntimeError(f"Replicate failed: {result.get('error')}") + + time.sleep(1) + + raise TimeoutError("Replicate prediction timeout") + + def _get_replicate_version(self, model_name: str) -> str: + """Obtiene version ID de Replicate.""" + versions = { + "black-forest-labs/flux-1.1-pro": "80a09d66baa990429c004a8ff540ce96c1e9e0e9c381", + "black-forest-labs/flux-schnell": "f2ab8a5bfe79f02f0789a146cf5e73d2a4ff2684a98c2b" + } + return versions.get(model_name, versions["black-forest-labs/flux-schnell"]) + + def _get_system_prompt(self, function: FunctionType) -> str: + """Obtiene system prompt según función.""" + + prompts = { + FunctionType.TEXT_GENERATION: """Eres un generador de contenido experto. +Produces textos de alta calidad, bien estructurados y profesionales. +Sigues las instrucciones del usuario con precisión.""", + + FunctionType.CODE_GENERATION: """Eres un programador experto. +Generas código limpio, eficiente y bien documentado. +Sigues mejores prácticas y patrones de diseño apropiados. +Incluyes manejo de errores y comentarios útiles.""", + + FunctionType.DOCUMENT_GENERATION: """Eres un experto en documentación profesional. +Creas documentos claros, completos y bien formateados. +Aseguras que toda la información necesaria esté presente. +Usas un tono profesional y apropiado al contexto.""", + + FunctionType.AUDIO_GENERATION: """Eres un experto en producción de audio. +Generas scripts y descripciones para contenido de audio.""", + + FunctionType.VIDEO_GENERATION: """Eres un experto en producción de video. +Generas guiones y descripciones para contenido de video.""" + } + + return prompts.get(function, prompts[FunctionType.TEXT_GENERATION]) diff --git a/handler.py b/handler.py new file mode 100644 index 0000000..be01db7 --- /dev/null +++ b/handler.py @@ -0,0 +1,228 @@ +""" +THE FACTORY - Trabajo Iterativo Generativo +RunPod Serverless Handler + +Tareas: +- image_generate: Genera imagen desde prompt +- image_variant: Genera variante de imagen existente +- image_upscale: Aumenta resolución +""" + +import runpod +import base64 +import os +from datetime import datetime +from io import BytesIO + +# Force CUDA device +os.environ["CUDA_VISIBLE_DEVICES"] = "0" + +# Modelos disponibles +MODELS = { + "sdxl": "stabilityai/stable-diffusion-xl-base-1.0", + "flux": "black-forest-labs/FLUX.1-schnell", + "sdxl-turbo": "stabilityai/sdxl-turbo" +} + +# Lazy loading de modelos +_loaded_models = {} + + +def get_model(model_name: str): + """Carga modelo bajo demanda.""" + global _loaded_models + + if model_name not in _loaded_models: + try: + import torch + from diffusers import AutoPipelineForText2Image + + # Force CUDA + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + model_id = MODELS.get(model_name, MODELS["sdxl-turbo"]) + + pipe = AutoPipelineForText2Image.from_pretrained( + model_id, + torch_dtype=torch.float16, + variant="fp16", + use_safetensors=True + ) + pipe = pipe.to(device) + + _loaded_models[model_name] = pipe + except Exception as e: + return None, str(e) + + return _loaded_models[model_name], None + + +def generate_image(prompt: str, model: str = "sdxl-turbo", + width: int = 1024, height: int = 1024, + steps: int = 4, guidance: float = 0.0) -> dict: + """Genera imagen desde prompt.""" + pipe, error = get_model(model) + if error: + return {"error": f"Model load failed: {error}"} + + try: + image = pipe( + prompt=prompt, + width=width, + height=height, + num_inference_steps=steps, + guidance_scale=guidance + ).images[0] + + # Convertir a base64 + buffer = BytesIO() + image.save(buffer, format="PNG") + img_base64 = base64.b64encode(buffer.getvalue()).decode() + + return { + "image_base64": img_base64, + "width": width, + "height": height, + "model": model + } + except Exception as e: + return {"error": str(e)} + + +def generate_variant(image_base64: str, prompt: str, + strength: float = 0.5, model: str = "sdxl-turbo") -> dict: + """Genera variante de imagen existente.""" + try: + import torch + from diffusers import AutoPipelineForImage2Image + from PIL import Image + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Decodificar imagen + img_data = base64.b64decode(image_base64) + init_image = Image.open(BytesIO(img_data)).convert("RGB") + + model_id = MODELS.get(model, MODELS["sdxl-turbo"]) + pipe = AutoPipelineForImage2Image.from_pretrained( + model_id, + torch_dtype=torch.float16, + variant="fp16" + ).to(device) + + image = pipe( + prompt=prompt, + image=init_image, + strength=strength, + num_inference_steps=4 + ).images[0] + + buffer = BytesIO() + image.save(buffer, format="PNG") + img_base64 = base64.b64encode(buffer.getvalue()).decode() + + return {"image_base64": img_base64} + except Exception as e: + return {"error": str(e)} + + +def upscale_image(image_base64: str, scale: int = 2) -> dict: + """Upscale imagen usando PIL LANCZOS.""" + try: + from PIL import Image + + img_data = base64.b64decode(image_base64) + image = Image.open(BytesIO(img_data)) + + new_size = (image.width * scale, image.height * scale) + upscaled = image.resize(new_size, Image.LANCZOS) + + buffer = BytesIO() + upscaled.save(buffer, format="PNG") + img_base64 = base64.b64encode(buffer.getvalue()).decode() + + return { + "image_base64": img_base64, + "width": new_size[0], + "height": new_size[1], + "scale": scale + } + except Exception as e: + return {"error": str(e)} + + +def handler(job): + """ + Handler principal de THE FACTORY. + + Input esperado: + { + "task": "image_generate", # Tarea a ejecutar + "prompt": "...", # Prompt para generación + "model": "sdxl-turbo", # Modelo a usar + "width": 1024, # Ancho (opcional) + "height": 1024, # Alto (opcional) + "image_base64": "...", # Para variant/upscale + "strength": 0.5, # Para variant + "scale": 2 # Para upscale + } + + Tasks disponibles: + - image_generate: Genera imagen desde prompt + - image_variant: Genera variante + - image_upscale: Aumenta resolución + """ + job_input = job.get("input", {}) + trace_id = job_input.get("trace_id", str(datetime.utcnow().timestamp())) + task = job_input.get("task", "image_generate") + + result = {"trace_id": trace_id, "task": task} + + if task == "image_generate": + prompt = job_input.get("prompt") + if not prompt: + return {"error": "prompt es requerido para image_generate"} + + gen_result = generate_image( + prompt=prompt, + model=job_input.get("model", "sdxl-turbo"), + width=job_input.get("width", 1024), + height=job_input.get("height", 1024), + steps=job_input.get("steps", 4), + guidance=job_input.get("guidance", 0.0) + ) + result.update(gen_result) + + elif task == "image_variant": + image_base64 = job_input.get("image_base64") + prompt = job_input.get("prompt", "") + if not image_base64: + return {"error": "image_base64 es requerido para image_variant"} + + var_result = generate_variant( + image_base64=image_base64, + prompt=prompt, + strength=job_input.get("strength", 0.5), + model=job_input.get("model", "sdxl-turbo") + ) + result.update(var_result) + + elif task == "image_upscale": + image_base64 = job_input.get("image_base64") + if not image_base64: + return {"error": "image_base64 es requerido para image_upscale"} + + up_result = upscale_image( + image_base64=image_base64, + scale=job_input.get("scale", 2) + ) + result.update(up_result) + + else: + return {"error": f"Task '{task}' no reconocida. Disponibles: image_generate, image_variant, image_upscale"} + + return result + + +# RunPod serverless +runpod.serverless.start({"handler": handler}) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2972e2a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +# THE FACTORY - Dependencias +runpod>=1.6.0 +requests>=2.31.0 +torch>=2.1.0 +diffusers>=0.25.0 +transformers>=4.36.0 +accelerate>=0.25.0 +safetensors>=0.4.0 +Pillow>=10.0.0