Tasks: - image_generate: Generate image from prompt - image_variant: Generate variant of existing image - image_upscale: Increase resolution Models: SDXL, Flux, SDXL-Turbo RunPod Serverless Handler
301 lines
9.8 KiB
Python
301 lines
9.8 KiB
Python
"""
|
|
=============================================================================
|
|
THE FACTORY - Executor
|
|
=============================================================================
|
|
Genera artefactos usando diferentes modelos de IA.
|
|
Soporta: texto, código, imágenes, documentos.
|
|
=============================================================================
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
import base64
|
|
from typing import Dict, Any, Optional
|
|
|
|
import anthropic
|
|
import openai
|
|
import httpx
|
|
|
|
from config import FactoryConfig, FunctionType, ModelConfig
|
|
|
|
logger = logging.getLogger("factory.executor")
|
|
|
|
|
|
class Executor:
|
|
"""
|
|
El Executor genera artefactos usando modelos de IA.
|
|
"""
|
|
|
|
def __init__(self, config: FactoryConfig):
|
|
self.config = config
|
|
|
|
# Inicializar clientes
|
|
if config.anthropic_api_key:
|
|
self.anthropic = anthropic.Anthropic(api_key=config.anthropic_api_key)
|
|
else:
|
|
self.anthropic = None
|
|
logger.warning("ANTHROPIC_API_KEY no configurada")
|
|
|
|
if config.openai_api_key:
|
|
self.openai = openai.OpenAI(api_key=config.openai_api_key)
|
|
else:
|
|
self.openai = None
|
|
logger.warning("OPENAI_API_KEY no configurada")
|
|
|
|
self.replicate_key = config.replicate_api_key
|
|
|
|
def generate(
|
|
self,
|
|
function: FunctionType,
|
|
context: Dict[str, Any],
|
|
budget_remaining: float
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Genera un artefacto.
|
|
|
|
Args:
|
|
function: Tipo de función
|
|
context: Contexto preparado por el Director
|
|
budget_remaining: Presupuesto disponible
|
|
|
|
Returns:
|
|
{
|
|
"artifact": <resultado>,
|
|
"cost_usd": <coste>,
|
|
"model_used": <modelo>,
|
|
"tokens_used": <tokens>
|
|
}
|
|
"""
|
|
model_name = context.get("model", self.config.default_models[function])
|
|
model_config = self.config.get_model(model_name)
|
|
|
|
if not model_config:
|
|
raise ValueError(f"Modelo desconocido: {model_name}")
|
|
|
|
prompt = context.get("prompt", "")
|
|
|
|
logger.info(f"Generando con {model_name} ({model_config.provider})")
|
|
|
|
if function == FunctionType.IMAGE_GENERATION:
|
|
return self._generate_image(prompt, model_config, budget_remaining)
|
|
else:
|
|
return self._generate_text(prompt, model_config, function, budget_remaining)
|
|
|
|
def _generate_text(
|
|
self,
|
|
prompt: str,
|
|
model: ModelConfig,
|
|
function: FunctionType,
|
|
budget_remaining: float
|
|
) -> Dict[str, Any]:
|
|
"""Genera texto usando Claude o GPT."""
|
|
|
|
if model.provider == "anthropic":
|
|
return self._generate_anthropic(prompt, model, function)
|
|
elif model.provider == "openai":
|
|
return self._generate_openai(prompt, model, function)
|
|
else:
|
|
raise ValueError(f"Provider no soportado: {model.provider}")
|
|
|
|
def _generate_anthropic(
|
|
self,
|
|
prompt: str,
|
|
model: ModelConfig,
|
|
function: FunctionType
|
|
) -> Dict[str, Any]:
|
|
"""Genera con Anthropic Claude."""
|
|
|
|
if not self.anthropic:
|
|
raise RuntimeError("Cliente Anthropic no inicializado")
|
|
|
|
# System prompt según función
|
|
system = self._get_system_prompt(function)
|
|
|
|
try:
|
|
response = self.anthropic.messages.create(
|
|
model=model.name,
|
|
max_tokens=model.max_tokens,
|
|
system=system,
|
|
messages=[{"role": "user", "content": prompt}]
|
|
)
|
|
|
|
# Extraer texto
|
|
artifact = ""
|
|
for block in response.content:
|
|
if hasattr(block, "text"):
|
|
artifact += block.text
|
|
|
|
# Calcular coste
|
|
input_tokens = response.usage.input_tokens
|
|
output_tokens = response.usage.output_tokens
|
|
cost = (
|
|
(input_tokens / 1000) * model.cost_per_1k_input +
|
|
(output_tokens / 1000) * model.cost_per_1k_output
|
|
)
|
|
|
|
return {
|
|
"artifact": artifact,
|
|
"cost_usd": cost,
|
|
"model_used": model.name,
|
|
"tokens_used": input_tokens + output_tokens,
|
|
"input_tokens": input_tokens,
|
|
"output_tokens": output_tokens
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error Anthropic: {e}")
|
|
raise
|
|
|
|
def _generate_openai(
|
|
self,
|
|
prompt: str,
|
|
model: ModelConfig,
|
|
function: FunctionType
|
|
) -> Dict[str, Any]:
|
|
"""Genera con OpenAI GPT."""
|
|
|
|
if not self.openai:
|
|
raise RuntimeError("Cliente OpenAI no inicializado")
|
|
|
|
system = self._get_system_prompt(function)
|
|
|
|
try:
|
|
response = self.openai.chat.completions.create(
|
|
model=model.name,
|
|
max_tokens=model.max_tokens,
|
|
messages=[
|
|
{"role": "system", "content": system},
|
|
{"role": "user", "content": prompt}
|
|
]
|
|
)
|
|
|
|
artifact = response.choices[0].message.content
|
|
|
|
# Calcular coste
|
|
input_tokens = response.usage.prompt_tokens
|
|
output_tokens = response.usage.completion_tokens
|
|
cost = (
|
|
(input_tokens / 1000) * model.cost_per_1k_input +
|
|
(output_tokens / 1000) * model.cost_per_1k_output
|
|
)
|
|
|
|
return {
|
|
"artifact": artifact,
|
|
"cost_usd": cost,
|
|
"model_used": model.name,
|
|
"tokens_used": input_tokens + output_tokens,
|
|
"input_tokens": input_tokens,
|
|
"output_tokens": output_tokens
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error OpenAI: {e}")
|
|
raise
|
|
|
|
def _generate_image(
|
|
self,
|
|
prompt: str,
|
|
model: ModelConfig,
|
|
budget_remaining: float
|
|
) -> Dict[str, Any]:
|
|
"""Genera imagen con Replicate (Flux)."""
|
|
|
|
if not self.replicate_key:
|
|
raise RuntimeError("REPLICATE_API_KEY no configurada")
|
|
|
|
try:
|
|
# Llamar a Replicate API
|
|
response = httpx.post(
|
|
"https://api.replicate.com/v1/predictions",
|
|
headers={
|
|
"Authorization": f"Token {self.replicate_key}",
|
|
"Content-Type": "application/json"
|
|
},
|
|
json={
|
|
"version": self._get_replicate_version(model.name),
|
|
"input": {
|
|
"prompt": prompt,
|
|
"aspect_ratio": "1:1",
|
|
"output_format": "webp",
|
|
"output_quality": 90
|
|
}
|
|
},
|
|
timeout=60.0
|
|
)
|
|
response.raise_for_status()
|
|
prediction = response.json()
|
|
|
|
# Esperar resultado
|
|
prediction_id = prediction["id"]
|
|
result = self._wait_for_replicate(prediction_id)
|
|
|
|
return {
|
|
"artifact": {
|
|
"url": result["output"][0] if isinstance(result["output"], list) else result["output"],
|
|
"prompt": prompt
|
|
},
|
|
"cost_usd": model.cost_per_1k_input, # Coste fijo por imagen
|
|
"model_used": model.name,
|
|
"prediction_id": prediction_id
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error Replicate: {e}")
|
|
raise
|
|
|
|
def _wait_for_replicate(self, prediction_id: str, max_wait: int = 120) -> Dict:
|
|
"""Espera resultado de Replicate."""
|
|
import time
|
|
|
|
for _ in range(max_wait):
|
|
response = httpx.get(
|
|
f"https://api.replicate.com/v1/predictions/{prediction_id}",
|
|
headers={"Authorization": f"Token {self.replicate_key}"},
|
|
timeout=10.0
|
|
)
|
|
result = response.json()
|
|
|
|
if result["status"] == "succeeded":
|
|
return result
|
|
elif result["status"] == "failed":
|
|
raise RuntimeError(f"Replicate failed: {result.get('error')}")
|
|
|
|
time.sleep(1)
|
|
|
|
raise TimeoutError("Replicate prediction timeout")
|
|
|
|
def _get_replicate_version(self, model_name: str) -> str:
|
|
"""Obtiene version ID de Replicate."""
|
|
versions = {
|
|
"black-forest-labs/flux-1.1-pro": "80a09d66baa990429c004a8ff540ce96c1e9e0e9c381",
|
|
"black-forest-labs/flux-schnell": "f2ab8a5bfe79f02f0789a146cf5e73d2a4ff2684a98c2b"
|
|
}
|
|
return versions.get(model_name, versions["black-forest-labs/flux-schnell"])
|
|
|
|
def _get_system_prompt(self, function: FunctionType) -> str:
|
|
"""Obtiene system prompt según función."""
|
|
|
|
prompts = {
|
|
FunctionType.TEXT_GENERATION: """Eres un generador de contenido experto.
|
|
Produces textos de alta calidad, bien estructurados y profesionales.
|
|
Sigues las instrucciones del usuario con precisión.""",
|
|
|
|
FunctionType.CODE_GENERATION: """Eres un programador experto.
|
|
Generas código limpio, eficiente y bien documentado.
|
|
Sigues mejores prácticas y patrones de diseño apropiados.
|
|
Incluyes manejo de errores y comentarios útiles.""",
|
|
|
|
FunctionType.DOCUMENT_GENERATION: """Eres un experto en documentación profesional.
|
|
Creas documentos claros, completos y bien formateados.
|
|
Aseguras que toda la información necesaria esté presente.
|
|
Usas un tono profesional y apropiado al contexto.""",
|
|
|
|
FunctionType.AUDIO_GENERATION: """Eres un experto en producción de audio.
|
|
Generas scripts y descripciones para contenido de audio.""",
|
|
|
|
FunctionType.VIDEO_GENERATION: """Eres un experto en producción de video.
|
|
Generas guiones y descripciones para contenido de video."""
|
|
}
|
|
|
|
return prompts.get(function, prompts[FunctionType.TEXT_GENERATION])
|