Initial commit: THE FACTORY - Iterative Image Generation

Tasks:
- image_generate: Generate image from prompt
- image_variant: Generate variant of existing image
- image_upscale: Increase resolution

Models: SDXL, Flux, SDXL-Turbo
RunPod Serverless Handler
This commit is contained in:
ARCHITECT
2026-01-06 08:28:16 +00:00
commit 1cad39bc9e
7 changed files with 1181 additions and 0 deletions

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# THE FACTORY - Image Generation for RunPod Serverless
FROM runpod/pytorch:2.1.0-py3.10-cuda11.8.0-devel-ubuntu22.04
WORKDIR /app
# Copy requirements and install Python packages
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy all Python files
COPY *.py ./
# Set environment variables
ENV PYTHONUNBUFFERED=1
ENV HF_HOME=/runpod-volume/huggingface
ENV TRANSFORMERS_CACHE=/runpod-volume/huggingface
# Run the handler
CMD ["python", "-u", "/app/handler.py"]

138
config.py Normal file
View File

@@ -0,0 +1,138 @@
"""
=============================================================================
THE FACTORY - Configuración
=============================================================================
"""
import os
from enum import Enum
from dataclasses import dataclass, field
from typing import Dict, List, Optional
class JobState(Enum):
"""Estados posibles de un job."""
PENDING = "PENDING"
QUEUED = "QUEUED"
RUNNING = "RUNNING"
EVALUATING = "EVALUATING"
CONVERGED = "CONVERGED"
EXHAUSTED = "EXHAUSTED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
class FunctionType(Enum):
"""Tipos de función soportados."""
TEXT_GENERATION = "TEXT_GENERATION"
IMAGE_GENERATION = "IMAGE_GENERATION"
CODE_GENERATION = "CODE_GENERATION"
DOCUMENT_GENERATION = "DOCUMENT_GENERATION"
AUDIO_GENERATION = "AUDIO_GENERATION"
VIDEO_GENERATION = "VIDEO_GENERATION"
@dataclass
class ModelConfig:
"""Configuración de un modelo."""
name: str
provider: str
cost_per_1k_input: float
cost_per_1k_output: float
max_tokens: int = 4096
supports_images: bool = False
supports_streaming: bool = True
@dataclass
class FactoryConfig:
"""Configuración global de THE FACTORY."""
# API Keys
anthropic_api_key: str = field(default_factory=lambda: os.environ.get("ANTHROPIC_API_KEY", ""))
openai_api_key: str = field(default_factory=lambda: os.environ.get("OPENAI_API_KEY", ""))
replicate_api_key: str = field(default_factory=lambda: os.environ.get("REPLICATE_API_KEY", ""))
# Límites por defecto
default_max_cycles: int = 5
default_budget_usd: float = 1.0
default_timeout_ms: int = 120000
# Convergencia
convergence_threshold: float = 0.85
diminishing_returns_threshold: float = 0.02
diminishing_returns_min_confidence: float = 0.70
# Modelos por función
models: Dict[str, ModelConfig] = field(default_factory=lambda: {
# Text generation
"claude-sonnet": ModelConfig(
name="claude-sonnet-4-20250514",
provider="anthropic",
cost_per_1k_input=0.003,
cost_per_1k_output=0.015,
max_tokens=8192,
supports_images=True
),
"claude-haiku": ModelConfig(
name="claude-haiku-4-20250514",
provider="anthropic",
cost_per_1k_input=0.00025,
cost_per_1k_output=0.00125,
max_tokens=8192,
supports_images=True
),
"gpt-4o": ModelConfig(
name="gpt-4o",
provider="openai",
cost_per_1k_input=0.005,
cost_per_1k_output=0.015,
max_tokens=4096,
supports_images=True
),
"gpt-4o-mini": ModelConfig(
name="gpt-4o-mini",
provider="openai",
cost_per_1k_input=0.00015,
cost_per_1k_output=0.0006,
max_tokens=4096,
supports_images=True
),
# Image generation
"flux-pro": ModelConfig(
name="black-forest-labs/flux-1.1-pro",
provider="replicate",
cost_per_1k_input=0.05, # por imagen
cost_per_1k_output=0.0,
max_tokens=0
),
"flux-schnell": ModelConfig(
name="black-forest-labs/flux-schnell",
provider="replicate",
cost_per_1k_input=0.003, # por imagen
cost_per_1k_output=0.0,
max_tokens=0
),
})
# Modelo por defecto por función
default_models: Dict[FunctionType, str] = field(default_factory=lambda: {
FunctionType.TEXT_GENERATION: "claude-sonnet",
FunctionType.CODE_GENERATION: "claude-sonnet",
FunctionType.DOCUMENT_GENERATION: "claude-sonnet",
FunctionType.IMAGE_GENERATION: "flux-schnell",
FunctionType.AUDIO_GENERATION: "claude-sonnet", # placeholder
FunctionType.VIDEO_GENERATION: "claude-sonnet", # placeholder
})
# Modelo para evaluación
evaluator_model: str = "claude-haiku"
def get_model(self, name: str) -> Optional[ModelConfig]:
"""Obtiene configuración de modelo."""
return self.models.get(name)
def get_default_model(self, function: FunctionType) -> ModelConfig:
"""Obtiene modelo por defecto para una función."""
model_name = self.default_models.get(function, "claude-sonnet")
return self.models[model_name]

254
director.py Normal file
View File

@@ -0,0 +1,254 @@
"""
=============================================================================
THE FACTORY - Director
=============================================================================
Coordina el proceso de generación iterativa.
- Prepara contexto para el Executor
- Decide cuándo converger
- Gestiona presupuesto
=============================================================================
"""
import logging
from typing import Dict, Any, Optional
from config import FactoryConfig, FunctionType
logger = logging.getLogger("factory.director")
class Director:
"""
El Director coordina el proceso de generación iterativa.
"""
def __init__(self, config: FactoryConfig):
self.config = config
def prepare_context(
self,
seed: str,
objective: str,
function: FunctionType,
previous_artifact: Optional[Any],
feedback: Optional[str],
iteration: int,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Prepara el contexto para el Executor.
Args:
seed: Prompt inicial
objective: Objetivo a alcanzar
function: Tipo de función
previous_artifact: Artefacto de iteración anterior
feedback: Feedback del Evaluator
iteration: Número de iteración actual
context: Contexto adicional del usuario
Returns:
Contexto preparado para el Executor
"""
exec_context = {
"seed": seed,
"objective": objective,
"function": function,
"iteration": iteration,
"is_first_iteration": iteration == 1,
"user_context": context
}
if iteration == 1:
# Primera iteración: usar seed directamente
exec_context["prompt"] = self._build_initial_prompt(seed, objective, function)
else:
# Iteraciones posteriores: incorporar feedback
exec_context["previous_artifact"] = previous_artifact
exec_context["feedback"] = feedback
exec_context["prompt"] = self._build_refinement_prompt(
seed, objective, function, previous_artifact, feedback, iteration
)
# Seleccionar modelo apropiado
exec_context["model"] = self._select_model(function, iteration, context)
return exec_context
def _build_initial_prompt(
self,
seed: str,
objective: str,
function: FunctionType
) -> str:
"""Construye prompt para primera iteración."""
if function == FunctionType.TEXT_GENERATION:
return f"""Genera contenido de alta calidad basado en lo siguiente:
SOLICITUD:
{seed}
OBJETIVO:
{objective}
Proporciona una respuesta completa, bien estructurada y profesional."""
elif function == FunctionType.CODE_GENERATION:
return f"""Genera código de alta calidad basado en lo siguiente:
SOLICITUD:
{seed}
OBJETIVO:
{objective}
El código debe:
- Ser limpio y bien documentado
- Seguir mejores prácticas
- Incluir manejo de errores apropiado
- Ser eficiente y mantenible"""
elif function == FunctionType.IMAGE_GENERATION:
return f"""{seed}
Style: Professional, high quality, detailed
Objective: {objective}"""
elif function == FunctionType.DOCUMENT_GENERATION:
return f"""Genera un documento profesional:
TIPO DE DOCUMENTO:
{seed}
OBJETIVO:
{objective}
El documento debe ser:
- Profesional y bien formateado
- Completo con toda la información necesaria
- Claro y fácil de leer"""
else:
return f"{seed}\n\nObjective: {objective}"
def _build_refinement_prompt(
self,
seed: str,
objective: str,
function: FunctionType,
previous_artifact: Any,
feedback: str,
iteration: int
) -> str:
"""Construye prompt para iteraciones de refinamiento."""
# Truncar artefacto anterior si es muy largo
prev_str = str(previous_artifact)
if len(prev_str) > 2000:
prev_str = prev_str[:2000] + "\n[...truncado...]"
if function in [FunctionType.TEXT_GENERATION, FunctionType.DOCUMENT_GENERATION]:
return f"""Mejora el siguiente contenido basándote en el feedback:
SOLICITUD ORIGINAL:
{seed}
OBJETIVO:
{objective}
CONTENIDO ANTERIOR (iteración {iteration - 1}):
{prev_str}
FEEDBACK A INCORPORAR:
{feedback}
Genera una versión mejorada que aborde el feedback mientras mantiene los aspectos positivos."""
elif function == FunctionType.CODE_GENERATION:
return f"""Mejora el siguiente código basándote en el feedback:
SOLICITUD ORIGINAL:
{seed}
OBJETIVO:
{objective}
CÓDIGO ANTERIOR (iteración {iteration - 1}):
```
{prev_str}
```
FEEDBACK A INCORPORAR:
{feedback}
Genera una versión mejorada del código."""
elif function == FunctionType.IMAGE_GENERATION:
return f"""{seed}
Previous attempt feedback: {feedback}
Iteration: {iteration}
Objective: {objective}
Improve based on feedback while maintaining the core concept."""
else:
return f"""Mejora basándote en el feedback:
Original: {seed}
Objetivo: {objective}
Anterior: {prev_str}
Feedback: {feedback}
Genera versión mejorada."""
def _select_model(
self,
function: FunctionType,
iteration: int,
context: Dict[str, Any]
) -> str:
"""Selecciona el modelo apropiado."""
# Si el usuario especificó un modelo, usarlo
if "model" in context:
return context["model"]
# Por defecto, usar el modelo configurado para la función
return self.config.default_models.get(function, "claude-sonnet")
def should_converge(
self,
confidence: float,
iteration: int,
previous_confidence: float,
budget_used: float,
budget_total: float
) -> bool:
"""
Decide si el job debe converger.
Criterios:
1. Umbral de confianza alcanzado
2. Rendimientos decrecientes (mejora < threshold)
3. Presupuesto casi agotado
"""
# 1. Umbral de confianza
if confidence >= self.config.convergence_threshold:
logger.info(f"Convergencia por umbral: {confidence:.2f} >= {self.config.convergence_threshold}")
return True
# 2. Rendimientos decrecientes
if iteration > 1 and confidence >= self.config.diminishing_returns_min_confidence:
improvement = confidence - previous_confidence
if improvement < self.config.diminishing_returns_threshold:
logger.info(f"Convergencia por rendimientos decrecientes: mejora {improvement:.3f} < {self.config.diminishing_returns_threshold}")
return True
# 3. Presupuesto casi agotado (>90% usado)
if budget_used >= budget_total * 0.9:
logger.info(f"Convergencia por presupuesto: ${budget_used:.4f} >= 90% de ${budget_total:.4f}")
return True
return False

233
evaluator.py Normal file
View File

@@ -0,0 +1,233 @@
"""
=============================================================================
THE FACTORY - Evaluator
=============================================================================
Evalúa artefactos generados vs el objetivo.
Proporciona:
- confidence: 0.0 a 1.0
- feedback: sugerencias de mejora
=============================================================================
"""
import json
import logging
from typing import Dict, Any, Optional
import anthropic
from config import FactoryConfig, FunctionType
logger = logging.getLogger("factory.evaluator")
class Evaluator:
"""
El Evaluator evalúa artefactos contra el objetivo.
"""
def __init__(self, config: FactoryConfig):
self.config = config
if config.anthropic_api_key:
self.anthropic = anthropic.Anthropic(api_key=config.anthropic_api_key)
else:
self.anthropic = None
logger.warning("ANTHROPIC_API_KEY no configurada para Evaluator")
def evaluate(
self,
artifact: Any,
objective: str,
function: FunctionType
) -> Dict[str, Any]:
"""
Evalúa un artefacto.
Args:
artifact: El artefacto generado
objective: El objetivo a cumplir
function: Tipo de función
Returns:
{
"confidence": 0.0-1.0,
"feedback": "sugerencias de mejora",
"strengths": ["punto fuerte 1", ...],
"weaknesses": ["punto débil 1", ...],
"cost_usd": coste de evaluación
}
"""
if not self.anthropic:
# Fallback: evaluación básica sin LLM
return self._evaluate_basic(artifact, objective, function)
return self._evaluate_with_llm(artifact, objective, function)
def _evaluate_with_llm(
self,
artifact: Any,
objective: str,
function: FunctionType
) -> Dict[str, Any]:
"""Evalúa usando Claude Haiku."""
model = self.config.get_model(self.config.evaluator_model)
# Preparar artefacto para evaluación
artifact_str = self._prepare_artifact_for_eval(artifact, function)
prompt = f"""Evalúa el siguiente artefacto contra el objetivo especificado.
OBJETIVO:
{objective}
ARTEFACTO A EVALUAR:
{artifact_str}
Proporciona tu evaluación en formato JSON con esta estructura exacta:
{{
"confidence": <número entre 0.0 y 1.0>,
"feedback": "<sugerencias específicas de mejora>",
"strengths": ["<punto fuerte 1>", "<punto fuerte 2>"],
"weaknesses": ["<punto débil 1>", "<punto débil 2>"],
"meets_objective": <true/false>
}}
Criterios de evaluación:
- 0.9-1.0: Excelente, cumple completamente el objetivo
- 0.7-0.9: Bueno, cumple mayormente con mejoras menores posibles
- 0.5-0.7: Aceptable, cumple parcialmente
- 0.3-0.5: Deficiente, necesita mejoras significativas
- 0.0-0.3: Inaceptable, no cumple el objetivo
Sé específico en el feedback para que el siguiente intento pueda mejorar."""
try:
response = self.anthropic.messages.create(
model=model.name,
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
# Extraer texto
response_text = ""
for block in response.content:
if hasattr(block, "text"):
response_text += block.text
# Parsear JSON
eval_result = self._parse_eval_response(response_text)
# Calcular coste
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cost = (
(input_tokens / 1000) * model.cost_per_1k_input +
(output_tokens / 1000) * model.cost_per_1k_output
)
eval_result["cost_usd"] = cost
eval_result["tokens_used"] = input_tokens + output_tokens
logger.info(f"Evaluación: confidence={eval_result['confidence']:.2f}")
return eval_result
except Exception as e:
logger.error(f"Error en evaluación LLM: {e}")
return self._evaluate_basic(artifact, objective, function)
def _parse_eval_response(self, response: str) -> Dict[str, Any]:
"""Parsea respuesta de evaluación."""
# Intentar extraer JSON
try:
# Buscar JSON en la respuesta
start = response.find("{")
end = response.rfind("}") + 1
if start >= 0 and end > start:
json_str = response[start:end]
result = json.loads(json_str)
# Validar campos requeridos
confidence = float(result.get("confidence", 0.5))
confidence = max(0.0, min(1.0, confidence)) # Clamp 0-1
return {
"confidence": confidence,
"feedback": result.get("feedback", "Sin feedback específico"),
"strengths": result.get("strengths", []),
"weaknesses": result.get("weaknesses", []),
"meets_objective": result.get("meets_objective", confidence >= 0.7)
}
except (json.JSONDecodeError, ValueError) as e:
logger.warning(f"Error parseando evaluación: {e}")
# Fallback
return {
"confidence": 0.5,
"feedback": "No se pudo evaluar correctamente. Intenta de nuevo.",
"strengths": [],
"weaknesses": ["Evaluación fallida"],
"meets_objective": False
}
def _evaluate_basic(
self,
artifact: Any,
objective: str,
function: FunctionType
) -> Dict[str, Any]:
"""Evaluación básica sin LLM."""
artifact_str = str(artifact) if artifact else ""
# Heurísticas simples
confidence = 0.5
# Verificar longitud mínima
if len(artifact_str) < 50:
confidence -= 0.2
elif len(artifact_str) > 200:
confidence += 0.1
# Verificar que contiene palabras del objetivo
objective_words = set(objective.lower().split())
artifact_words = set(artifact_str.lower().split())
overlap = len(objective_words & artifact_words)
if overlap >= len(objective_words) * 0.3:
confidence += 0.2
confidence = max(0.0, min(1.0, confidence))
return {
"confidence": confidence,
"feedback": "Evaluación básica aplicada. Considera revisar manualmente.",
"strengths": ["Artefacto generado"],
"weaknesses": ["Evaluación automática limitada"],
"meets_objective": confidence >= 0.7,
"cost_usd": 0.0
}
def _prepare_artifact_for_eval(self, artifact: Any, function: FunctionType) -> str:
"""Prepara artefacto para evaluación."""
if artifact is None:
return "[No artifact generated]"
if function == FunctionType.IMAGE_GENERATION:
if isinstance(artifact, dict):
return f"[Image generated]\nURL: {artifact.get('url', 'N/A')}\nPrompt used: {artifact.get('prompt', 'N/A')}"
return "[Image generated]"
artifact_str = str(artifact)
# Truncar si es muy largo
if len(artifact_str) > 4000:
return artifact_str[:4000] + "\n[...truncado para evaluación...]"
return artifact_str

300
executor.py Normal file
View File

@@ -0,0 +1,300 @@
"""
=============================================================================
THE FACTORY - Executor
=============================================================================
Genera artefactos usando diferentes modelos de IA.
Soporta: texto, código, imágenes, documentos.
=============================================================================
"""
import os
import logging
import base64
from typing import Dict, Any, Optional
import anthropic
import openai
import httpx
from config import FactoryConfig, FunctionType, ModelConfig
logger = logging.getLogger("factory.executor")
class Executor:
"""
El Executor genera artefactos usando modelos de IA.
"""
def __init__(self, config: FactoryConfig):
self.config = config
# Inicializar clientes
if config.anthropic_api_key:
self.anthropic = anthropic.Anthropic(api_key=config.anthropic_api_key)
else:
self.anthropic = None
logger.warning("ANTHROPIC_API_KEY no configurada")
if config.openai_api_key:
self.openai = openai.OpenAI(api_key=config.openai_api_key)
else:
self.openai = None
logger.warning("OPENAI_API_KEY no configurada")
self.replicate_key = config.replicate_api_key
def generate(
self,
function: FunctionType,
context: Dict[str, Any],
budget_remaining: float
) -> Dict[str, Any]:
"""
Genera un artefacto.
Args:
function: Tipo de función
context: Contexto preparado por el Director
budget_remaining: Presupuesto disponible
Returns:
{
"artifact": <resultado>,
"cost_usd": <coste>,
"model_used": <modelo>,
"tokens_used": <tokens>
}
"""
model_name = context.get("model", self.config.default_models[function])
model_config = self.config.get_model(model_name)
if not model_config:
raise ValueError(f"Modelo desconocido: {model_name}")
prompt = context.get("prompt", "")
logger.info(f"Generando con {model_name} ({model_config.provider})")
if function == FunctionType.IMAGE_GENERATION:
return self._generate_image(prompt, model_config, budget_remaining)
else:
return self._generate_text(prompt, model_config, function, budget_remaining)
def _generate_text(
self,
prompt: str,
model: ModelConfig,
function: FunctionType,
budget_remaining: float
) -> Dict[str, Any]:
"""Genera texto usando Claude o GPT."""
if model.provider == "anthropic":
return self._generate_anthropic(prompt, model, function)
elif model.provider == "openai":
return self._generate_openai(prompt, model, function)
else:
raise ValueError(f"Provider no soportado: {model.provider}")
def _generate_anthropic(
self,
prompt: str,
model: ModelConfig,
function: FunctionType
) -> Dict[str, Any]:
"""Genera con Anthropic Claude."""
if not self.anthropic:
raise RuntimeError("Cliente Anthropic no inicializado")
# System prompt según función
system = self._get_system_prompt(function)
try:
response = self.anthropic.messages.create(
model=model.name,
max_tokens=model.max_tokens,
system=system,
messages=[{"role": "user", "content": prompt}]
)
# Extraer texto
artifact = ""
for block in response.content:
if hasattr(block, "text"):
artifact += block.text
# Calcular coste
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cost = (
(input_tokens / 1000) * model.cost_per_1k_input +
(output_tokens / 1000) * model.cost_per_1k_output
)
return {
"artifact": artifact,
"cost_usd": cost,
"model_used": model.name,
"tokens_used": input_tokens + output_tokens,
"input_tokens": input_tokens,
"output_tokens": output_tokens
}
except Exception as e:
logger.error(f"Error Anthropic: {e}")
raise
def _generate_openai(
self,
prompt: str,
model: ModelConfig,
function: FunctionType
) -> Dict[str, Any]:
"""Genera con OpenAI GPT."""
if not self.openai:
raise RuntimeError("Cliente OpenAI no inicializado")
system = self._get_system_prompt(function)
try:
response = self.openai.chat.completions.create(
model=model.name,
max_tokens=model.max_tokens,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": prompt}
]
)
artifact = response.choices[0].message.content
# Calcular coste
input_tokens = response.usage.prompt_tokens
output_tokens = response.usage.completion_tokens
cost = (
(input_tokens / 1000) * model.cost_per_1k_input +
(output_tokens / 1000) * model.cost_per_1k_output
)
return {
"artifact": artifact,
"cost_usd": cost,
"model_used": model.name,
"tokens_used": input_tokens + output_tokens,
"input_tokens": input_tokens,
"output_tokens": output_tokens
}
except Exception as e:
logger.error(f"Error OpenAI: {e}")
raise
def _generate_image(
self,
prompt: str,
model: ModelConfig,
budget_remaining: float
) -> Dict[str, Any]:
"""Genera imagen con Replicate (Flux)."""
if not self.replicate_key:
raise RuntimeError("REPLICATE_API_KEY no configurada")
try:
# Llamar a Replicate API
response = httpx.post(
"https://api.replicate.com/v1/predictions",
headers={
"Authorization": f"Token {self.replicate_key}",
"Content-Type": "application/json"
},
json={
"version": self._get_replicate_version(model.name),
"input": {
"prompt": prompt,
"aspect_ratio": "1:1",
"output_format": "webp",
"output_quality": 90
}
},
timeout=60.0
)
response.raise_for_status()
prediction = response.json()
# Esperar resultado
prediction_id = prediction["id"]
result = self._wait_for_replicate(prediction_id)
return {
"artifact": {
"url": result["output"][0] if isinstance(result["output"], list) else result["output"],
"prompt": prompt
},
"cost_usd": model.cost_per_1k_input, # Coste fijo por imagen
"model_used": model.name,
"prediction_id": prediction_id
}
except Exception as e:
logger.error(f"Error Replicate: {e}")
raise
def _wait_for_replicate(self, prediction_id: str, max_wait: int = 120) -> Dict:
"""Espera resultado de Replicate."""
import time
for _ in range(max_wait):
response = httpx.get(
f"https://api.replicate.com/v1/predictions/{prediction_id}",
headers={"Authorization": f"Token {self.replicate_key}"},
timeout=10.0
)
result = response.json()
if result["status"] == "succeeded":
return result
elif result["status"] == "failed":
raise RuntimeError(f"Replicate failed: {result.get('error')}")
time.sleep(1)
raise TimeoutError("Replicate prediction timeout")
def _get_replicate_version(self, model_name: str) -> str:
"""Obtiene version ID de Replicate."""
versions = {
"black-forest-labs/flux-1.1-pro": "80a09d66baa990429c004a8ff540ce96c1e9e0e9c381",
"black-forest-labs/flux-schnell": "f2ab8a5bfe79f02f0789a146cf5e73d2a4ff2684a98c2b"
}
return versions.get(model_name, versions["black-forest-labs/flux-schnell"])
def _get_system_prompt(self, function: FunctionType) -> str:
"""Obtiene system prompt según función."""
prompts = {
FunctionType.TEXT_GENERATION: """Eres un generador de contenido experto.
Produces textos de alta calidad, bien estructurados y profesionales.
Sigues las instrucciones del usuario con precisión.""",
FunctionType.CODE_GENERATION: """Eres un programador experto.
Generas código limpio, eficiente y bien documentado.
Sigues mejores prácticas y patrones de diseño apropiados.
Incluyes manejo de errores y comentarios útiles.""",
FunctionType.DOCUMENT_GENERATION: """Eres un experto en documentación profesional.
Creas documentos claros, completos y bien formateados.
Aseguras que toda la información necesaria esté presente.
Usas un tono profesional y apropiado al contexto.""",
FunctionType.AUDIO_GENERATION: """Eres un experto en producción de audio.
Generas scripts y descripciones para contenido de audio.""",
FunctionType.VIDEO_GENERATION: """Eres un experto en producción de video.
Generas guiones y descripciones para contenido de video."""
}
return prompts.get(function, prompts[FunctionType.TEXT_GENERATION])

228
handler.py Normal file
View File

@@ -0,0 +1,228 @@
"""
THE FACTORY - Trabajo Iterativo Generativo
RunPod Serverless Handler
Tareas:
- image_generate: Genera imagen desde prompt
- image_variant: Genera variante de imagen existente
- image_upscale: Aumenta resolución
"""
import runpod
import base64
import os
from datetime import datetime
from io import BytesIO
# Force CUDA device
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# Modelos disponibles
MODELS = {
"sdxl": "stabilityai/stable-diffusion-xl-base-1.0",
"flux": "black-forest-labs/FLUX.1-schnell",
"sdxl-turbo": "stabilityai/sdxl-turbo"
}
# Lazy loading de modelos
_loaded_models = {}
def get_model(model_name: str):
"""Carga modelo bajo demanda."""
global _loaded_models
if model_name not in _loaded_models:
try:
import torch
from diffusers import AutoPipelineForText2Image
# Force CUDA
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_id = MODELS.get(model_name, MODELS["sdxl-turbo"])
pipe = AutoPipelineForText2Image.from_pretrained(
model_id,
torch_dtype=torch.float16,
variant="fp16",
use_safetensors=True
)
pipe = pipe.to(device)
_loaded_models[model_name] = pipe
except Exception as e:
return None, str(e)
return _loaded_models[model_name], None
def generate_image(prompt: str, model: str = "sdxl-turbo",
width: int = 1024, height: int = 1024,
steps: int = 4, guidance: float = 0.0) -> dict:
"""Genera imagen desde prompt."""
pipe, error = get_model(model)
if error:
return {"error": f"Model load failed: {error}"}
try:
image = pipe(
prompt=prompt,
width=width,
height=height,
num_inference_steps=steps,
guidance_scale=guidance
).images[0]
# Convertir a base64
buffer = BytesIO()
image.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return {
"image_base64": img_base64,
"width": width,
"height": height,
"model": model
}
except Exception as e:
return {"error": str(e)}
def generate_variant(image_base64: str, prompt: str,
strength: float = 0.5, model: str = "sdxl-turbo") -> dict:
"""Genera variante de imagen existente."""
try:
import torch
from diffusers import AutoPipelineForImage2Image
from PIL import Image
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Decodificar imagen
img_data = base64.b64decode(image_base64)
init_image = Image.open(BytesIO(img_data)).convert("RGB")
model_id = MODELS.get(model, MODELS["sdxl-turbo"])
pipe = AutoPipelineForImage2Image.from_pretrained(
model_id,
torch_dtype=torch.float16,
variant="fp16"
).to(device)
image = pipe(
prompt=prompt,
image=init_image,
strength=strength,
num_inference_steps=4
).images[0]
buffer = BytesIO()
image.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return {"image_base64": img_base64}
except Exception as e:
return {"error": str(e)}
def upscale_image(image_base64: str, scale: int = 2) -> dict:
"""Upscale imagen usando PIL LANCZOS."""
try:
from PIL import Image
img_data = base64.b64decode(image_base64)
image = Image.open(BytesIO(img_data))
new_size = (image.width * scale, image.height * scale)
upscaled = image.resize(new_size, Image.LANCZOS)
buffer = BytesIO()
upscaled.save(buffer, format="PNG")
img_base64 = base64.b64encode(buffer.getvalue()).decode()
return {
"image_base64": img_base64,
"width": new_size[0],
"height": new_size[1],
"scale": scale
}
except Exception as e:
return {"error": str(e)}
def handler(job):
"""
Handler principal de THE FACTORY.
Input esperado:
{
"task": "image_generate", # Tarea a ejecutar
"prompt": "...", # Prompt para generación
"model": "sdxl-turbo", # Modelo a usar
"width": 1024, # Ancho (opcional)
"height": 1024, # Alto (opcional)
"image_base64": "...", # Para variant/upscale
"strength": 0.5, # Para variant
"scale": 2 # Para upscale
}
Tasks disponibles:
- image_generate: Genera imagen desde prompt
- image_variant: Genera variante
- image_upscale: Aumenta resolución
"""
job_input = job.get("input", {})
trace_id = job_input.get("trace_id", str(datetime.utcnow().timestamp()))
task = job_input.get("task", "image_generate")
result = {"trace_id": trace_id, "task": task}
if task == "image_generate":
prompt = job_input.get("prompt")
if not prompt:
return {"error": "prompt es requerido para image_generate"}
gen_result = generate_image(
prompt=prompt,
model=job_input.get("model", "sdxl-turbo"),
width=job_input.get("width", 1024),
height=job_input.get("height", 1024),
steps=job_input.get("steps", 4),
guidance=job_input.get("guidance", 0.0)
)
result.update(gen_result)
elif task == "image_variant":
image_base64 = job_input.get("image_base64")
prompt = job_input.get("prompt", "")
if not image_base64:
return {"error": "image_base64 es requerido para image_variant"}
var_result = generate_variant(
image_base64=image_base64,
prompt=prompt,
strength=job_input.get("strength", 0.5),
model=job_input.get("model", "sdxl-turbo")
)
result.update(var_result)
elif task == "image_upscale":
image_base64 = job_input.get("image_base64")
if not image_base64:
return {"error": "image_base64 es requerido para image_upscale"}
up_result = upscale_image(
image_base64=image_base64,
scale=job_input.get("scale", 2)
)
result.update(up_result)
else:
return {"error": f"Task '{task}' no reconocida. Disponibles: image_generate, image_variant, image_upscale"}
return result
# RunPod serverless
runpod.serverless.start({"handler": handler})

9
requirements.txt Normal file
View File

@@ -0,0 +1,9 @@
# THE FACTORY - Dependencias
runpod>=1.6.0
requests>=2.31.0
torch>=2.1.0
diffusers>=0.25.0
transformers>=4.36.0
accelerate>=0.25.0
safetensors>=0.4.0
Pillow>=10.0.0