Initial commit: Context Manager v1.0.0

Sistema local de gestión de contexto para IA:
- Log inmutable (blockchain-style)
- Algoritmos versionados y mejorables
- Agnóstico al modelo (Anthropic, OpenAI, Ollama)
- Sistema de métricas y A/B testing

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
ARCHITECT
2025-12-29 18:55:27 +00:00
commit 6ab93d3485
19 changed files with 4253 additions and 0 deletions

25
src/__init__.py Normal file
View File

@@ -0,0 +1,25 @@
"""
Context Manager - Sistema local de gestión de contexto para IA
Características:
- Log inmutable (tabla de referencia no editable)
- Gestor de contexto mejorable
- Agnóstico al modelo de IA
- Sistema de métricas para mejora continua
"""
__version__ = "1.0.0"
__author__ = "TZZR System"
from .database import Database
from .context_selector import ContextSelector
from .models import Session, Message, ContextBlock, Algorithm
__all__ = [
"Database",
"ContextSelector",
"Session",
"Message",
"ContextBlock",
"Algorithm",
]

608
src/algorithm_improver.py Normal file
View File

@@ -0,0 +1,608 @@
"""
Sistema de mejora continua de algoritmos de contexto.
Permite:
- Evaluar rendimiento de algoritmos
- A/B testing entre versiones
- Sugerir mejoras basadas en métricas
- Auto-ajuste de parámetros
"""
import uuid
import statistics
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Tuple
from dataclasses import dataclass
from .models import Algorithm, AlgorithmMetric, AlgorithmStatus
from .database import Database
@dataclass
class AlgorithmAnalysis:
"""Análisis de rendimiento de un algoritmo"""
algorithm_id: uuid.UUID
algorithm_code: str
total_uses: int
sample_size: int
# Métricas principales
avg_token_efficiency: float
avg_relevance: Optional[float]
avg_quality: Optional[float]
avg_satisfaction: Optional[float]
avg_latency_ms: Optional[float]
# Estadísticas avanzadas
quality_stddev: Optional[float]
quality_p25: Optional[float]
quality_p50: Optional[float]
quality_p75: Optional[float]
# Composición promedio
avg_composition: Dict[str, Any]
# Tendencia (últimos 7 días vs anteriores)
quality_trend: Optional[str] # "improving", "stable", "declining"
# Recomendaciones
suggestions: List[str]
@dataclass
class ExperimentResult:
"""Resultado de un experimento A/B"""
experiment_id: uuid.UUID
control_algorithm: str
treatment_algorithm: str
control_samples: int
treatment_samples: int
control_avg_quality: float
treatment_avg_quality: float
improvement_pct: float
statistical_significance: float
winner: Optional[str]
recommendation: str
class AlgorithmImprover:
"""
Motor de mejora continua de algoritmos.
Analiza métricas históricas y sugiere/aplica mejoras.
"""
def __init__(self, db: Database):
self.db = db
def analyze_algorithm(
self,
algorithm_id: uuid.UUID = None,
days: int = 30
) -> AlgorithmAnalysis:
"""
Analiza el rendimiento de un algoritmo.
Args:
algorithm_id: ID del algoritmo (o el activo por defecto)
days: Días de histórico a analizar
Returns:
AlgorithmAnalysis con métricas y sugerencias
"""
# Obtener algoritmo
if algorithm_id:
algorithm = self.db.get_algorithm(algorithm_id)
else:
algorithm = self.db.get_active_algorithm()
if not algorithm:
raise ValueError("No se encontró el algoritmo")
# Obtener métricas
with self.db.get_cursor() as cur:
cur.execute(
"""
SELECT
COUNT(*) as total,
AVG(token_efficiency) as avg_efficiency,
AVG(relevance_score) as avg_relevance,
AVG(response_quality) as avg_quality,
AVG(user_satisfaction) as avg_satisfaction,
AVG(latency_ms) as avg_latency,
STDDEV(response_quality) as quality_stddev,
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY response_quality) as p25,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY response_quality) as p50,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY response_quality) as p75
FROM algorithm_metrics
WHERE algorithm_id = %s
AND recorded_at > NOW() - INTERVAL '%s days'
""",
(str(algorithm.id), days)
)
stats = cur.fetchone()
# Obtener composición promedio
cur.execute(
"""
SELECT context_composition
FROM algorithm_metrics
WHERE algorithm_id = %s
AND context_composition IS NOT NULL
ORDER BY recorded_at DESC
LIMIT 100
""",
(str(algorithm.id),)
)
compositions = [row["context_composition"] for row in cur.fetchall()]
# Tendencia reciente vs anterior
cur.execute(
"""
SELECT
CASE
WHEN recorded_at > NOW() - INTERVAL '7 days' THEN 'recent'
ELSE 'older'
END as period,
AVG(response_quality) as avg_quality
FROM algorithm_metrics
WHERE algorithm_id = %s
AND recorded_at > NOW() - INTERVAL '%s days'
AND response_quality IS NOT NULL
GROUP BY period
""",
(str(algorithm.id), days)
)
trends = {row["period"]: row["avg_quality"] for row in cur.fetchall()}
# Calcular composición promedio
avg_composition = self._average_composition(compositions)
# Determinar tendencia
trend = None
if "recent" in trends and "older" in trends:
diff = trends["recent"] - trends["older"]
if diff > 0.05:
trend = "improving"
elif diff < -0.05:
trend = "declining"
else:
trend = "stable"
# Generar sugerencias
suggestions = self._generate_suggestions(
stats, avg_composition, algorithm.config, trend
)
return AlgorithmAnalysis(
algorithm_id=algorithm.id,
algorithm_code=algorithm.code,
total_uses=algorithm.times_used,
sample_size=stats["total"] if stats["total"] else 0,
avg_token_efficiency=float(stats["avg_efficiency"]) if stats["avg_efficiency"] else 0,
avg_relevance=float(stats["avg_relevance"]) if stats["avg_relevance"] else None,
avg_quality=float(stats["avg_quality"]) if stats["avg_quality"] else None,
avg_satisfaction=float(stats["avg_satisfaction"]) if stats["avg_satisfaction"] else None,
avg_latency_ms=float(stats["avg_latency"]) if stats["avg_latency"] else None,
quality_stddev=float(stats["quality_stddev"]) if stats["quality_stddev"] else None,
quality_p25=float(stats["p25"]) if stats["p25"] else None,
quality_p50=float(stats["p50"]) if stats["p50"] else None,
quality_p75=float(stats["p75"]) if stats["p75"] else None,
avg_composition=avg_composition,
quality_trend=trend,
suggestions=suggestions
)
def _average_composition(self, compositions: List[Dict]) -> Dict[str, Any]:
"""Calcula la composición promedio del contexto"""
if not compositions:
return {}
totals = {}
for comp in compositions:
for source, data in comp.items():
if source not in totals:
totals[source] = {"count": [], "tokens": []}
if isinstance(data, dict):
totals[source]["count"].append(data.get("count", 0))
totals[source]["tokens"].append(data.get("tokens", 0))
return {
source: {
"avg_count": statistics.mean(data["count"]) if data["count"] else 0,
"avg_tokens": statistics.mean(data["tokens"]) if data["tokens"] else 0
}
for source, data in totals.items()
}
def _generate_suggestions(
self,
stats: Dict,
composition: Dict,
config: Dict,
trend: str
) -> List[str]:
"""Genera sugerencias de mejora basadas en métricas"""
suggestions = []
# Eficiencia de tokens
if stats.get("avg_efficiency") and stats["avg_efficiency"] > 0.95:
suggestions.append(
"Alta eficiencia de tokens (>95%). Considera aumentar max_tokens "
"para incluir más contexto relevante."
)
elif stats.get("avg_efficiency") and stats["avg_efficiency"] < 0.5:
suggestions.append(
"Baja eficiencia de tokens (<50%). El contexto es muy pequeño. "
"Revisa las fuentes de datos disponibles."
)
# Calidad de respuestas
if stats.get("avg_quality"):
if stats["avg_quality"] < 0.6:
suggestions.append(
"Calidad promedio baja (<0.6). Considera:\n"
" - Aumentar la cantidad de memoria incluida\n"
" - Mejorar los bloques de contexto\n"
" - Revisar el filtrado de conocimiento"
)
elif stats.get("quality_stddev") and stats["quality_stddev"] > 0.2:
suggestions.append(
"Alta variabilidad en calidad (stddev > 0.2). "
"El contexto no es consistente. Revisa las reglas de activación."
)
# Tendencia
if trend == "declining":
suggestions.append(
"La calidad está en declive. Considera:\n"
" - Actualizar la base de conocimiento\n"
" - Revisar memorias obsoletas\n"
" - Crear un fork del algoritmo para experimentar"
)
# Composición del contexto
if composition:
history = composition.get("history", {})
if history.get("avg_tokens", 0) > config.get("max_tokens", 4000) * 0.7:
suggestions.append(
"El historial ocupa >70% del contexto. Considera:\n"
" - Reducir max_messages en history_config\n"
" - Activar summarize_after más temprano\n"
" - Aumentar el presupuesto total de tokens"
)
memory = composition.get("memory", {})
if memory.get("avg_count", 0) < 3:
suggestions.append(
"Poca memoria incluida (<3 items). Considera:\n"
" - Reducir min_importance en memory_config\n"
" - Aumentar max_items de memoria"
)
if not suggestions:
suggestions.append("El algoritmo está funcionando bien. No hay sugerencias inmediatas.")
return suggestions
def create_experiment(
self,
control_id: uuid.UUID,
treatment_id: uuid.UUID,
name: str,
traffic_split: float = 0.5,
min_samples: int = 100
) -> uuid.UUID:
"""
Crea un experimento A/B entre dos algoritmos.
Args:
control_id: Algoritmo de control (actual)
treatment_id: Algoritmo de tratamiento (nuevo)
name: Nombre del experimento
traffic_split: % de tráfico para treatment (0.0-1.0)
min_samples: Muestras mínimas para concluir
Returns:
ID del experimento
"""
with self.db.get_cursor() as cur:
cur.execute(
"""
INSERT INTO algorithm_experiments
(name, control_algorithm_id, treatment_algorithm_id, traffic_split, min_samples, status)
VALUES (%s, %s, %s, %s, %s, 'running')
RETURNING id
""",
(name, str(control_id), str(treatment_id), traffic_split, min_samples)
)
return cur.fetchone()["id"]
def get_experiment_algorithm(
self,
experiment_id: uuid.UUID
) -> Tuple[uuid.UUID, str]:
"""
Obtiene qué algoritmo usar basado en el experimento activo.
Returns:
Tuple de (algorithm_id, group) donde group es 'control' o 'treatment'
"""
import random
with self.db.get_cursor() as cur:
cur.execute(
"""
SELECT * FROM algorithm_experiments
WHERE id = %s AND status = 'running'
""",
(str(experiment_id),)
)
exp = cur.fetchone()
if not exp:
raise ValueError("Experimento no encontrado o no activo")
# Asignar grupo basado en traffic_split
if random.random() < exp["traffic_split"]:
return exp["treatment_algorithm_id"], "treatment"
else:
return exp["control_algorithm_id"], "control"
def evaluate_experiment(self, experiment_id: uuid.UUID) -> ExperimentResult:
"""
Evalúa los resultados de un experimento.
Returns:
ExperimentResult con análisis estadístico
"""
with self.db.get_cursor() as cur:
cur.execute(
"""
SELECT
e.*,
c.code as control_code,
t.code as treatment_code
FROM algorithm_experiments e
JOIN context_algorithms c ON e.control_algorithm_id = c.id
JOIN context_algorithms t ON e.treatment_algorithm_id = t.id
WHERE e.id = %s
""",
(str(experiment_id),)
)
exp = cur.fetchone()
if not exp:
raise ValueError("Experimento no encontrado")
# Obtener métricas de cada grupo
cur.execute(
"""
SELECT
algorithm_id,
COUNT(*) as samples,
AVG(response_quality) as avg_quality,
STDDEV(response_quality) as stddev_quality
FROM algorithm_metrics
WHERE algorithm_id IN (%s, %s)
AND response_quality IS NOT NULL
AND recorded_at > (
SELECT created_at FROM algorithm_experiments WHERE id = %s
)
GROUP BY algorithm_id
""",
(str(exp["control_algorithm_id"]), str(exp["treatment_algorithm_id"]),
str(experiment_id))
)
results = {str(row["algorithm_id"]): row for row in cur.fetchall()}
control_data = results.get(str(exp["control_algorithm_id"]), {})
treatment_data = results.get(str(exp["treatment_algorithm_id"]), {})
control_quality = float(control_data.get("avg_quality", 0)) if control_data else 0
treatment_quality = float(treatment_data.get("avg_quality", 0)) if treatment_data else 0
control_samples = control_data.get("samples", 0) if control_data else 0
treatment_samples = treatment_data.get("samples", 0) if treatment_data else 0
# Calcular mejora
if control_quality > 0:
improvement = ((treatment_quality - control_quality) / control_quality) * 100
else:
improvement = 0
# Significancia estadística simple (z-test aproximado)
significance = self._calculate_significance(
control_quality, control_data.get("stddev_quality", 0.1), control_samples,
treatment_quality, treatment_data.get("stddev_quality", 0.1), treatment_samples
)
# Determinar ganador
winner = None
recommendation = ""
if control_samples >= exp["min_samples"] and treatment_samples >= exp["min_samples"]:
if significance > 0.95:
if treatment_quality > control_quality:
winner = exp["treatment_code"]
recommendation = f"Activar {winner} como algoritmo principal."
else:
winner = exp["control_code"]
recommendation = f"Mantener {winner}. El tratamiento no mejoró."
else:
recommendation = "No hay diferencia significativa. Continuar experimentando."
else:
recommendation = f"Faltan muestras. Control: {control_samples}/{exp['min_samples']}, Treatment: {treatment_samples}/{exp['min_samples']}"
return ExperimentResult(
experiment_id=experiment_id,
control_algorithm=exp["control_code"],
treatment_algorithm=exp["treatment_code"],
control_samples=control_samples,
treatment_samples=treatment_samples,
control_avg_quality=control_quality,
treatment_avg_quality=treatment_quality,
improvement_pct=improvement,
statistical_significance=significance,
winner=winner,
recommendation=recommendation
)
def _calculate_significance(
self,
mean1: float, std1: float, n1: int,
mean2: float, std2: float, n2: int
) -> float:
"""Calcula significancia estadística (z-test aproximado)"""
if n1 < 2 or n2 < 2:
return 0.0
import math
std1 = std1 or 0.1
std2 = std2 or 0.1
se = math.sqrt((std1**2 / n1) + (std2**2 / n2))
if se == 0:
return 0.0
z = abs(mean1 - mean2) / se
# Aproximación de la función CDF normal
# P(Z <= z) usando aproximación de Zelen-Severo
if z > 6:
return 0.999
elif z < -6:
return 0.001
t = 1 / (1 + 0.2316419 * abs(z))
d = 0.3989423 * math.exp(-z * z / 2)
p = d * t * (0.3193815 + t * (-0.3565638 + t * (1.781478 + t * (-1.821256 + t * 1.330274))))
if z > 0:
p = 1 - p
# Convertir a confianza (two-tailed)
return 1 - 2 * min(p, 1 - p)
def suggest_improvements(self, algorithm_id: uuid.UUID = None) -> Dict[str, Any]:
"""
Sugiere mejoras concretas para el algoritmo.
Returns:
Dict con sugerencias de configuración
"""
analysis = self.analyze_algorithm(algorithm_id)
algorithm = self.db.get_algorithm(analysis.algorithm_id) if analysis.algorithm_id else None
if not algorithm:
return {"error": "No se encontró el algoritmo"}
current_config = algorithm.config.copy()
suggested_config = current_config.copy()
changes = []
# Ajustar basado en análisis
if analysis.avg_token_efficiency and analysis.avg_token_efficiency > 0.95:
current_max = current_config.get("max_tokens", 4000)
suggested_config["max_tokens"] = int(current_max * 1.25)
changes.append(f"Aumentar max_tokens de {current_max} a {suggested_config['max_tokens']}")
if analysis.avg_quality and analysis.avg_quality < 0.6:
# Más memoria
memory_config = suggested_config.get("memory_config", {})
memory_config["max_items"] = min(memory_config.get("max_items", 15) + 5, 30)
memory_config["min_importance"] = max(memory_config.get("min_importance", 30) - 10, 10)
suggested_config["memory_config"] = memory_config
changes.append("Aumentar memoria incluida")
# Más conocimiento
knowledge_config = suggested_config.get("knowledge_config", {})
knowledge_config["max_items"] = min(knowledge_config.get("max_items", 5) + 3, 15)
suggested_config["knowledge_config"] = knowledge_config
changes.append("Aumentar conocimiento incluido")
if analysis.avg_composition:
history = analysis.avg_composition.get("history", {})
total_tokens = sum(
data.get("avg_tokens", 0)
for data in analysis.avg_composition.values()
)
if total_tokens > 0 and history.get("avg_tokens", 0) / total_tokens > 0.7:
history_config = suggested_config.get("history_config", {})
history_config["max_messages"] = max(history_config.get("max_messages", 20) - 5, 5)
history_config["summarize_after"] = max(history_config.get("summarize_after", 10) - 3, 3)
suggested_config["history_config"] = history_config
changes.append("Reducir historial para dar espacio a otro contexto")
return {
"algorithm_code": algorithm.code,
"current_config": current_config,
"suggested_config": suggested_config,
"changes": changes,
"analysis": {
"avg_quality": analysis.avg_quality,
"avg_efficiency": analysis.avg_token_efficiency,
"trend": analysis.quality_trend,
"suggestions": analysis.suggestions
}
}
def apply_improvements(
self,
algorithm_id: uuid.UUID,
new_config: Dict[str, Any],
create_fork: bool = True,
fork_name: str = None
) -> uuid.UUID:
"""
Aplica mejoras a un algoritmo.
Args:
algorithm_id: Algoritmo a mejorar
new_config: Nueva configuración
create_fork: Si True, crea un fork en lugar de modificar
fork_name: Nombre del fork (si create_fork=True)
Returns:
ID del algoritmo (nuevo o existente)
"""
if create_fork:
algorithm = self.db.get_algorithm(algorithm_id)
if not algorithm:
raise ValueError("Algoritmo no encontrado")
new_code = f"{algorithm.code}_v{algorithm.version.replace('.', '_')}_improved"
new_name = fork_name or f"{algorithm.name} (mejorado)"
new_id = self.db.fork_algorithm(
source_id=algorithm_id,
new_code=new_code,
new_name=new_name,
reason="Mejora automática basada en métricas"
)
# Actualizar config del fork
with self.db.get_cursor() as cur:
from psycopg2.extras import Json
cur.execute(
"""
UPDATE context_algorithms
SET config = %s, status = 'testing'
WHERE id = %s
""",
(Json(new_config), str(new_id))
)
return new_id
else:
# Modificar directamente
with self.db.get_cursor() as cur:
from psycopg2.extras import Json
cur.execute(
"""
UPDATE context_algorithms
SET config = %s, updated_at = NOW()
WHERE id = %s
""",
(Json(new_config), str(algorithm_id))
)
return algorithm_id

403
src/cli.py Normal file
View File

@@ -0,0 +1,403 @@
#!/usr/bin/env python3
"""
CLI para Context Manager
Uso:
context-manager init # Inicializa la base de datos
context-manager chat [--provider X] # Inicia chat interactivo
context-manager analyze # Analiza algoritmo activo
context-manager experiment create # Crea experimento A/B
context-manager block add # Añade bloque de contexto
context-manager memory list # Lista memorias
context-manager verify SESSION_ID # Verifica integridad
"""
import argparse
import os
import sys
import uuid
from pathlib import Path
try:
from .database import Database
from .context_selector import ContextManager
from .algorithm_improver import AlgorithmImprover
from .models import ContextBlock, Memory
except ImportError:
# Para ejecución directa
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.database import Database
from src.context_selector import ContextManager
from src.algorithm_improver import AlgorithmImprover
from src.models import ContextBlock, Memory
def get_db_from_args(args) -> Database:
"""Obtiene conexión a BD desde argumentos"""
return Database(
host=args.host or os.getenv("PGHOST", "localhost"),
port=args.port or int(os.getenv("PGPORT", "5432")),
database=args.database or os.getenv("PGDATABASE", "context_manager"),
user=args.user or os.getenv("PGUSER", "postgres"),
password=args.password or os.getenv("PGPASSWORD", "")
)
def cmd_init(args):
"""Inicializa la base de datos"""
import psycopg2
# Conectar sin base de datos específica
conn = psycopg2.connect(
host=args.host or os.getenv("PGHOST", "localhost"),
port=args.port or int(os.getenv("PGPORT", "5432")),
user=args.user or os.getenv("PGUSER", "postgres"),
password=args.password or os.getenv("PGPASSWORD", ""),
database="postgres"
)
conn.autocommit = True
db_name = args.database or os.getenv("PGDATABASE", "context_manager")
with conn.cursor() as cur:
# Verificar si existe
cur.execute(
"SELECT 1 FROM pg_database WHERE datname = %s",
(db_name,)
)
if not cur.fetchone():
print(f"Creando base de datos '{db_name}'...")
cur.execute(f'CREATE DATABASE "{db_name}"')
else:
print(f"Base de datos '{db_name}' ya existe")
conn.close()
# Aplicar schemas
db = get_db_from_args(args)
schema_dir = Path(__file__).parent.parent / "schemas"
print("Aplicando schemas...")
for schema_file in sorted(schema_dir.glob("*.sql")):
print(f" - {schema_file.name}")
with open(schema_file) as f:
sql = f.read()
with db.get_cursor(dict_cursor=False) as cur:
cur.execute(sql)
print("Base de datos inicializada correctamente")
db.close()
def cmd_chat(args):
"""Inicia chat interactivo"""
db = get_db_from_args(args)
manager = ContextManager(db=db)
# Configurar proveedor
provider = None
if args.provider == "anthropic":
from .providers import AnthropicProvider
provider = AnthropicProvider(model=args.model or "claude-sonnet-4-20250514")
elif args.provider == "openai":
from .providers import OpenAIProvider
provider = OpenAIProvider(model=args.model or "gpt-4")
elif args.provider == "ollama":
from .providers import OllamaProvider
provider = OllamaProvider(model=args.model or "llama3")
if not provider:
print("Proveedor no configurado. Usando modo demo (sin IA)")
# Iniciar sesión
session = manager.start_session(
user_id=args.user or "cli_user",
model_provider=args.provider,
model_name=args.model
)
print(f"Sesión iniciada: {session.id}")
print("Escribe 'exit' para salir, 'verify' para verificar integridad")
print("-" * 50)
while True:
try:
user_input = input("\nTú: ").strip()
except (KeyboardInterrupt, EOFError):
break
if not user_input:
continue
if user_input.lower() == "exit":
break
if user_input.lower() == "verify":
result = manager.verify_session_integrity()
if result["is_valid"]:
print("Integridad verificada correctamente")
else:
print(f"ERROR: Integridad comprometida en secuencia {result['broken_at_sequence']}")
continue
# Obtener contexto
context = manager.get_context_for_message(user_input, max_tokens=args.max_tokens)
print(f"[Contexto: {context.total_tokens} tokens, {len(context.items)} items]")
# Registrar mensaje usuario
user_log_id = manager.log_user_message(user_input, context)
# Generar respuesta
if provider:
response = provider.send_message(user_input, context)
print(f"\nAsistente: {response.content}")
# Registrar respuesta
assistant_log_id = manager.log_assistant_message(
content=response.content,
tokens_input=response.tokens_input,
tokens_output=response.tokens_output,
latency_ms=response.latency_ms
)
# Registrar métrica
manager.record_metric(
context=context,
log_entry_id=assistant_log_id,
tokens_budget=args.max_tokens,
latency_ms=response.latency_ms,
model_tokens_input=response.tokens_input,
model_tokens_output=response.tokens_output
)
else:
print("\n[Modo demo - sin proveedor de IA configurado]")
print(f"Contexto seleccionado:")
for item in context.items[:5]:
print(f" - [{item.source.value}] {item.content[:100]}...")
print("\nSesión finalizada")
manager.close()
def cmd_analyze(args):
"""Analiza rendimiento del algoritmo"""
db = get_db_from_args(args)
improver = AlgorithmImprover(db)
algorithm_id = uuid.UUID(args.algorithm) if args.algorithm else None
analysis = improver.analyze_algorithm(algorithm_id, days=args.days)
print(f"\n{'='*60}")
print(f"ANÁLISIS DE ALGORITMO: {analysis.algorithm_code}")
print(f"{'='*60}")
print(f"\nMétricas generales:")
print(f" - Usos totales: {analysis.total_uses}")
print(f" - Muestras analizadas: {analysis.sample_size}")
print(f" - Eficiencia de tokens: {analysis.avg_token_efficiency:.2%}" if analysis.avg_token_efficiency else " - Eficiencia de tokens: N/A")
print(f" - Calidad promedio: {analysis.avg_quality:.2f}" if analysis.avg_quality else " - Calidad promedio: N/A")
print(f" - Satisfacción: {analysis.avg_satisfaction:.2f}" if analysis.avg_satisfaction else " - Satisfacción: N/A")
print(f" - Latencia promedio: {analysis.avg_latency_ms:.0f}ms" if analysis.avg_latency_ms else " - Latencia promedio: N/A")
if analysis.quality_trend:
trend_emoji = {"improving": "📈", "stable": "➡️", "declining": "📉"}.get(analysis.quality_trend, "")
print(f"\nTendencia: {trend_emoji} {analysis.quality_trend}")
if analysis.avg_composition:
print(f"\nComposición promedio del contexto:")
for source, data in analysis.avg_composition.items():
print(f" - {source}: {data.get('avg_count', 0):.1f} items, {data.get('avg_tokens', 0):.0f} tokens")
print(f"\nSugerencias:")
for i, suggestion in enumerate(analysis.suggestions, 1):
print(f" {i}. {suggestion}")
db.close()
def cmd_suggest(args):
"""Sugiere mejoras para el algoritmo"""
db = get_db_from_args(args)
improver = AlgorithmImprover(db)
algorithm_id = uuid.UUID(args.algorithm) if args.algorithm else None
suggestions = improver.suggest_improvements(algorithm_id)
print(f"\n{'='*60}")
print(f"SUGERENCIAS PARA: {suggestions.get('algorithm_code', 'N/A')}")
print(f"{'='*60}")
if suggestions.get("changes"):
print("\nCambios sugeridos:")
for i, change in enumerate(suggestions["changes"], 1):
print(f" {i}. {change}")
if args.apply:
print("\nAplicando cambios...")
new_id = improver.apply_improvements(
algorithm_id or db.get_active_algorithm().id,
suggestions["suggested_config"],
create_fork=True
)
print(f"Nuevo algoritmo creado: {new_id}")
else:
print("\nNo hay cambios sugeridos")
db.close()
def cmd_block_add(args):
"""Añade un bloque de contexto"""
db = get_db_from_args(args)
block = ContextBlock(
code=args.code,
name=args.name,
description=args.description,
content=args.content or sys.stdin.read(),
category=args.category,
priority=args.priority
)
block_id = db.create_context_block(block)
print(f"Bloque creado: {block_id}")
db.close()
def cmd_block_list(args):
"""Lista bloques de contexto"""
db = get_db_from_args(args)
blocks = db.get_active_context_blocks(category=args.category)
print(f"\n{'Code':<20} {'Name':<30} {'Category':<15} {'Priority':<10} {'Tokens':<10}")
print("-" * 85)
for block in blocks:
print(f"{block.code:<20} {block.name[:28]:<30} {block.category:<15} {block.priority:<10} {block.tokens_estimated:<10}")
db.close()
def cmd_memory_list(args):
"""Lista memorias"""
db = get_db_from_args(args)
memories = db.get_memories(type=args.type, limit=args.limit)
print(f"\n{'Type':<15} {'Importance':<12} {'Uses':<8} {'Content':<60}")
print("-" * 95)
for mem in memories:
content_preview = mem.content[:57] + "..." if len(mem.content) > 60 else mem.content
print(f"{mem.type:<15} {mem.importance:<12} {mem.uses:<8} {content_preview:<60}")
db.close()
def cmd_verify(args):
"""Verifica integridad de una sesión"""
db = get_db_from_args(args)
session_id = uuid.UUID(args.session_id)
result = db.verify_chain_integrity(session_id)
if result["is_valid"]:
print(f"Sesión {session_id}: INTEGRIDAD OK")
else:
print(f"Sesión {session_id}: INTEGRIDAD COMPROMETIDA")
print(f" - Secuencia: {result['broken_at_sequence']}")
print(f" - Hash esperado: {result['expected_hash']}")
print(f" - Hash encontrado: {result['actual_hash']}")
db.close()
def main():
parser = argparse.ArgumentParser(
description="Context Manager - Sistema de gestión de contexto para IA"
)
# Argumentos globales de BD
parser.add_argument("--host", help="Host de PostgreSQL")
parser.add_argument("--port", type=int, help="Puerto de PostgreSQL")
parser.add_argument("--database", help="Nombre de la base de datos")
parser.add_argument("--user", help="Usuario de PostgreSQL")
parser.add_argument("--password", help="Contraseña de PostgreSQL")
subparsers = parser.add_subparsers(dest="command", help="Comandos disponibles")
# init
init_parser = subparsers.add_parser("init", help="Inicializa la base de datos")
# chat
chat_parser = subparsers.add_parser("chat", help="Inicia chat interactivo")
chat_parser.add_argument("--provider", choices=["anthropic", "openai", "ollama"],
help="Proveedor de IA")
chat_parser.add_argument("--model", help="Modelo a usar")
chat_parser.add_argument("--user", help="ID de usuario")
chat_parser.add_argument("--max-tokens", type=int, default=4000,
help="Máximo de tokens de contexto")
# analyze
analyze_parser = subparsers.add_parser("analyze", help="Analiza rendimiento del algoritmo")
analyze_parser.add_argument("--algorithm", help="ID del algoritmo (o activo por defecto)")
analyze_parser.add_argument("--days", type=int, default=30, help="Días de histórico")
# suggest
suggest_parser = subparsers.add_parser("suggest", help="Sugiere mejoras")
suggest_parser.add_argument("--algorithm", help="ID del algoritmo")
suggest_parser.add_argument("--apply", action="store_true", help="Aplicar sugerencias")
# block
block_parser = subparsers.add_parser("block", help="Gestión de bloques de contexto")
block_sub = block_parser.add_subparsers(dest="block_command")
block_add = block_sub.add_parser("add", help="Añade bloque")
block_add.add_argument("code", help="Código único del bloque")
block_add.add_argument("name", help="Nombre del bloque")
block_add.add_argument("--description", help="Descripción")
block_add.add_argument("--content", help="Contenido (o stdin)")
block_add.add_argument("--category", default="general", help="Categoría")
block_add.add_argument("--priority", type=int, default=50, help="Prioridad (0-100)")
block_list = block_sub.add_parser("list", help="Lista bloques")
block_list.add_argument("--category", help="Filtrar por categoría")
# memory
memory_parser = subparsers.add_parser("memory", help="Gestión de memorias")
memory_sub = memory_parser.add_subparsers(dest="memory_command")
memory_list = memory_sub.add_parser("list", help="Lista memorias")
memory_list.add_argument("--type", help="Filtrar por tipo")
memory_list.add_argument("--limit", type=int, default=20, help="Límite de resultados")
# verify
verify_parser = subparsers.add_parser("verify", help="Verifica integridad de sesión")
verify_parser.add_argument("session_id", help="ID de la sesión")
args = parser.parse_args()
if args.command == "init":
cmd_init(args)
elif args.command == "chat":
cmd_chat(args)
elif args.command == "analyze":
cmd_analyze(args)
elif args.command == "suggest":
cmd_suggest(args)
elif args.command == "block":
if args.block_command == "add":
cmd_block_add(args)
elif args.block_command == "list":
cmd_block_list(args)
else:
block_parser.print_help()
elif args.command == "memory":
if args.memory_command == "list":
cmd_memory_list(args)
else:
memory_parser.print_help()
elif args.command == "verify":
cmd_verify(args)
else:
parser.print_help()
if __name__ == "__main__":
main()

508
src/context_selector.py Normal file
View File

@@ -0,0 +1,508 @@
"""
Selector de Contexto - Motor principal
Selecciona el contexto óptimo para enviar al modelo de IA
basándose en el algoritmo activo y las fuentes disponibles.
"""
import re
import uuid
from datetime import datetime
from typing import Optional, List, Dict, Any, Callable
from .models import (
Session, Message, MessageRole, ContextBlock, Memory,
Knowledge, Algorithm, AmbientContext, ContextItem,
SelectedContext, ContextSource
)
from .database import Database
class ContextSelector:
"""
Motor de selección de contexto.
Características:
- Agnóstico al modelo de IA
- Basado en algoritmo configurable
- Métricas de rendimiento
- Soporte para múltiples fuentes
"""
def __init__(self, db: Database):
self.db = db
self._custom_selectors: Dict[str, Callable] = {}
def register_selector(self, name: str, selector_fn: Callable):
"""Registra un selector de contexto personalizado"""
self._custom_selectors[name] = selector_fn
def select_context(
self,
session: Session,
user_message: str,
algorithm: Algorithm = None,
max_tokens: int = None
) -> SelectedContext:
"""
Selecciona el contexto óptimo para la sesión actual.
Args:
session: Sesión activa
user_message: Mensaje del usuario
algorithm: Algoritmo a usar (o el activo por defecto)
max_tokens: Límite de tokens (sobreescribe config del algoritmo)
Returns:
SelectedContext con los items seleccionados
"""
# Obtener algoritmo
if algorithm is None:
algorithm = self.db.get_active_algorithm()
if algorithm is None:
# Algoritmo por defecto si no hay ninguno
algorithm = Algorithm(
code="DEFAULT",
name="Default",
config={
"max_tokens": 4000,
"sources": {
"system_prompts": True,
"context_blocks": True,
"memory": True,
"knowledge": True,
"history": True,
"ambient": True
},
"weights": {"priority": 0.4, "relevance": 0.3, "recency": 0.2, "frequency": 0.1},
"history_config": {"max_messages": 20, "summarize_after": 10, "include_system": False},
"memory_config": {"max_items": 15, "min_importance": 30},
"knowledge_config": {"max_items": 5, "require_keyword_match": True}
}
)
config = algorithm.config
token_budget = max_tokens or config.get("max_tokens", 4000)
sources = config.get("sources", {})
# Verificar si hay selector personalizado
if algorithm.selector_code and algorithm.code in self._custom_selectors:
return self._custom_selectors[algorithm.code](
session, user_message, config, self.db
)
# Selección estándar
context = SelectedContext(algorithm_id=algorithm.id)
composition = {}
# 1. System prompts y bloques de contexto
if sources.get("context_blocks", True):
blocks = self._select_context_blocks(user_message, config)
for block in blocks:
if context.total_tokens + block.tokens_estimated <= token_budget:
context.items.append(ContextItem(
source=ContextSource.DATASET,
content=block.content,
tokens=block.tokens_estimated,
priority=block.priority,
metadata={"block_code": block.code, "category": block.category}
))
context.total_tokens += block.tokens_estimated
composition["context_blocks"] = {
"count": len([i for i in context.items if i.source == ContextSource.DATASET]),
"tokens": sum(i.tokens for i in context.items if i.source == ContextSource.DATASET)
}
# 2. Memoria a largo plazo
if sources.get("memory", True):
memory_config = config.get("memory_config", {})
memories = self._select_memories(
user_message,
min_importance=memory_config.get("min_importance", 30),
max_items=memory_config.get("max_items", 15)
)
memory_tokens = 0
memory_count = 0
for mem in memories:
tokens = len(mem.content) // 4
if context.total_tokens + tokens <= token_budget:
context.items.append(ContextItem(
source=ContextSource.MEMORY,
content=f"[Memoria - {mem.type}]: {mem.content}",
tokens=tokens,
priority=mem.importance,
metadata={"memory_type": mem.type, "importance": mem.importance}
))
context.total_tokens += tokens
memory_tokens += tokens
memory_count += 1
composition["memory"] = {"count": memory_count, "tokens": memory_tokens}
# 3. Base de conocimiento
if sources.get("knowledge", True):
knowledge_config = config.get("knowledge_config", {})
keywords = self._extract_keywords(user_message)
if keywords or not knowledge_config.get("require_keyword_match", True):
knowledge_items = self.db.search_knowledge(
keywords=keywords if knowledge_config.get("require_keyword_match", True) else None,
limit=knowledge_config.get("max_items", 5)
)
knowledge_tokens = 0
knowledge_count = 0
for item in knowledge_items:
if context.total_tokens + item.tokens_estimated <= token_budget:
context.items.append(ContextItem(
source=ContextSource.KNOWLEDGE,
content=f"[Conocimiento - {item.title}]: {item.content}",
tokens=item.tokens_estimated,
priority=item.priority,
metadata={"title": item.title, "category": item.category}
))
context.total_tokens += item.tokens_estimated
knowledge_tokens += item.tokens_estimated
knowledge_count += 1
composition["knowledge"] = {"count": knowledge_count, "tokens": knowledge_tokens}
# 4. Contexto ambiental
if sources.get("ambient", True):
ambient = self.db.get_latest_ambient_context()
if ambient:
ambient_content = self._format_ambient_context(ambient)
tokens = len(ambient_content) // 4
if context.total_tokens + tokens <= token_budget:
context.items.append(ContextItem(
source=ContextSource.AMBIENT,
content=ambient_content,
tokens=tokens,
priority=30,
metadata={"captured_at": ambient.captured_at.isoformat()}
))
context.total_tokens += tokens
composition["ambient"] = {"count": 1, "tokens": tokens}
# 5. Historial de conversación (al final para llenar espacio restante)
if sources.get("history", True):
history_config = config.get("history_config", {})
history = self.db.get_session_history(
session.id,
limit=history_config.get("max_messages", 20),
include_system=history_config.get("include_system", False)
)
history_tokens = 0
history_count = 0
for msg in history:
tokens = len(msg.content) // 4
if context.total_tokens + tokens <= token_budget:
context.items.append(ContextItem(
source=ContextSource.HISTORY,
content=msg.content,
tokens=tokens,
priority=10,
metadata={"role": msg.role.value, "sequence": msg.sequence_num}
))
context.total_tokens += tokens
history_tokens += tokens
history_count += 1
composition["history"] = {"count": history_count, "tokens": history_tokens}
context.composition = composition
return context
def _select_context_blocks(
self,
user_message: str,
config: Dict[str, Any]
) -> List[ContextBlock]:
"""Selecciona bloques de contexto relevantes"""
blocks = self.db.get_active_context_blocks()
relevant_blocks = []
keywords = self._extract_keywords(user_message)
for block in blocks:
rules = block.activation_rules
# Siempre incluir
if rules.get("always", False):
relevant_blocks.append(block)
continue
# Verificar keywords
block_keywords = rules.get("keywords", [])
if block_keywords:
if any(kw.lower() in user_message.lower() for kw in block_keywords):
relevant_blocks.append(block)
continue
# Verificar categoría system (siempre incluir)
if block.category == "system":
relevant_blocks.append(block)
# Ordenar por prioridad
relevant_blocks.sort(key=lambda b: b.priority, reverse=True)
return relevant_blocks
def _select_memories(
self,
user_message: str,
min_importance: int = 30,
max_items: int = 15
) -> List[Memory]:
"""Selecciona memorias relevantes"""
memories = self.db.get_memories(
min_importance=min_importance,
limit=max_items * 2 # Obtener más para filtrar
)
# Filtrar por relevancia al mensaje
keywords = self._extract_keywords(user_message)
if keywords:
scored_memories = []
for mem in memories:
score = sum(1 for kw in keywords if kw.lower() in mem.content.lower())
scored_memories.append((mem, score + mem.importance / 100))
scored_memories.sort(key=lambda x: x[1], reverse=True)
return [m[0] for m in scored_memories[:max_items]]
return memories[:max_items]
def _extract_keywords(self, text: str) -> List[str]:
"""Extrae keywords de un texto"""
# Palabras comunes a ignorar
stopwords = {
"el", "la", "los", "las", "un", "una", "unos", "unas",
"de", "del", "al", "a", "en", "con", "por", "para",
"que", "qué", "como", "cómo", "donde", "dónde", "cuando", "cuándo",
"es", "son", "está", "están", "ser", "estar", "tener", "hacer",
"y", "o", "pero", "si", "no", "me", "te", "se", "nos",
"the", "a", "an", "is", "are", "was", "were", "be", "been",
"have", "has", "had", "do", "does", "did", "will", "would",
"can", "could", "should", "may", "might", "must",
"and", "or", "but", "if", "then", "else", "when", "where",
"what", "which", "who", "whom", "this", "that", "these", "those",
"i", "you", "he", "she", "it", "we", "they", "my", "your", "his", "her"
}
# Extraer palabras
words = re.findall(r'\b\w+\b', text.lower())
# Filtrar
keywords = [
w for w in words
if len(w) > 2 and w not in stopwords
]
return list(set(keywords))
def _format_ambient_context(self, ambient: AmbientContext) -> str:
"""Formatea el contexto ambiental como texto"""
lines = ["[Contexto del sistema]"]
env = ambient.environment
if env:
if env.get("timezone"):
lines.append(f"- Zona horaria: {env['timezone']}")
if env.get("working_directory"):
lines.append(f"- Directorio: {env['working_directory']}")
if env.get("git_branch"):
lines.append(f"- Git branch: {env['git_branch']}")
if env.get("active_project"):
lines.append(f"- Proyecto activo: {env['active_project']}")
state = ambient.system_state
if state:
if state.get("servers"):
servers = state["servers"]
online = [k for k, v in servers.items() if v == "online"]
if online:
lines.append(f"- Servidores online: {', '.join(online)}")
if state.get("alerts"):
alerts = state["alerts"]
if alerts:
lines.append(f"- Alertas activas: {len(alerts)}")
return "\n".join(lines)
class ContextManager:
"""
Gestor completo de contexto.
Combina el selector con el logging y métricas.
"""
def __init__(
self,
db: Database = None,
host: str = None,
port: int = None,
database: str = None,
user: str = None,
password: str = None
):
if db:
self.db = db
else:
self.db = Database(
host=host,
port=port,
database=database,
user=user,
password=password
)
self.selector = ContextSelector(self.db)
self._current_session: Optional[Session] = None
def start_session(
self,
user_id: str = None,
instance_id: str = None,
model_provider: str = None,
model_name: str = None,
metadata: Dict[str, Any] = None
) -> Session:
"""Inicia una nueva sesión"""
algorithm = self.db.get_active_algorithm()
self._current_session = self.db.create_session(
user_id=user_id,
instance_id=instance_id,
model_provider=model_provider,
model_name=model_name,
algorithm_id=algorithm.id if algorithm else None,
metadata=metadata
)
return self._current_session
def get_context_for_message(
self,
message: str,
max_tokens: int = None,
session: Session = None
) -> SelectedContext:
"""Obtiene el contexto para un mensaje"""
session = session or self._current_session
if not session:
raise ValueError("No hay sesión activa. Llama a start_session() primero.")
return self.selector.select_context(
session=session,
user_message=message,
max_tokens=max_tokens
)
def log_user_message(
self,
content: str,
context: SelectedContext = None,
session: Session = None
) -> uuid.UUID:
"""Registra un mensaje del usuario en el log inmutable"""
session = session or self._current_session
if not session:
raise ValueError("No hay sesión activa.")
return self.db.insert_log_entry(
session_id=session.id,
role=MessageRole.USER,
content=content,
model_provider=session.model_provider,
model_name=session.model_name,
context_snapshot=context.to_dict() if context else None,
context_algorithm_id=context.algorithm_id if context else None,
context_tokens_used=context.total_tokens if context else None
)
def log_assistant_message(
self,
content: str,
tokens_input: int = None,
tokens_output: int = None,
latency_ms: int = None,
model_provider: str = None,
model_name: str = None,
model_params: Dict[str, Any] = None,
session: Session = None
) -> uuid.UUID:
"""Registra una respuesta del asistente en el log inmutable"""
session = session or self._current_session
if not session:
raise ValueError("No hay sesión activa.")
return self.db.insert_log_entry(
session_id=session.id,
role=MessageRole.ASSISTANT,
content=content,
model_provider=model_provider or session.model_provider,
model_name=model_name or session.model_name,
model_params=model_params,
tokens_input=tokens_input,
tokens_output=tokens_output,
latency_ms=latency_ms
)
def record_metric(
self,
context: SelectedContext,
log_entry_id: uuid.UUID,
tokens_budget: int,
latency_ms: int = None,
model_tokens_input: int = None,
model_tokens_output: int = None,
session: Session = None
) -> uuid.UUID:
"""Registra una métrica de uso del algoritmo"""
session = session or self._current_session
if not session or not context.algorithm_id:
return None
return self.db.record_metric(
algorithm_id=context.algorithm_id,
session_id=session.id,
log_entry_id=log_entry_id,
tokens_budget=tokens_budget,
tokens_used=context.total_tokens,
context_composition=context.composition,
latency_ms=latency_ms,
model_tokens_input=model_tokens_input,
model_tokens_output=model_tokens_output
)
def rate_response(
self,
metric_id: uuid.UUID,
relevance: float = None,
quality: float = None,
satisfaction: float = None
):
"""Evalúa una respuesta (feedback manual)"""
self.db.update_metric_evaluation(
metric_id=metric_id,
relevance=relevance,
quality=quality,
satisfaction=satisfaction,
method="user_feedback"
)
def verify_session_integrity(self, session: Session = None) -> Dict[str, Any]:
"""Verifica la integridad de la sesión"""
session = session or self._current_session
if not session:
raise ValueError("No hay sesión activa.")
return self.db.verify_chain_integrity(session.id)
def close(self):
"""Cierra las conexiones"""
self.db.close()

621
src/database.py Normal file
View File

@@ -0,0 +1,621 @@
"""
Conexión a base de datos PostgreSQL
"""
import os
import uuid
import json
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
try:
import psycopg2
from psycopg2.extras import RealDictCursor, Json
from psycopg2 import pool
HAS_PSYCOPG2 = True
except ImportError:
HAS_PSYCOPG2 = False
from .models import (
Session, Message, MessageRole, ContextBlock, Memory,
Knowledge, Algorithm, AlgorithmStatus, AlgorithmMetric,
AmbientContext
)
class Database:
"""Gestión de conexión a PostgreSQL"""
def __init__(
self,
host: str = None,
port: int = None,
database: str = None,
user: str = None,
password: str = None,
min_connections: int = 1,
max_connections: int = 10
):
if not HAS_PSYCOPG2:
raise ImportError("psycopg2 no está instalado. Ejecuta: pip install psycopg2-binary")
self.host = host or os.getenv("PGHOST", "localhost")
self.port = port or int(os.getenv("PGPORT", "5432"))
self.database = database or os.getenv("PGDATABASE", "context_manager")
self.user = user or os.getenv("PGUSER", "postgres")
self.password = password or os.getenv("PGPASSWORD", "")
self._pool = pool.ThreadedConnectionPool(
min_connections,
max_connections,
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password
)
@contextmanager
def get_connection(self):
"""Obtiene una conexión del pool"""
conn = self._pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
self._pool.putconn(conn)
@contextmanager
def get_cursor(self, dict_cursor: bool = True):
"""Obtiene un cursor"""
with self.get_connection() as conn:
cursor_factory = RealDictCursor if dict_cursor else None
with conn.cursor(cursor_factory=cursor_factory) as cur:
yield cur
def close(self):
"""Cierra el pool de conexiones"""
self._pool.closeall()
# ==========================================
# SESIONES
# ==========================================
def create_session(
self,
user_id: str = None,
instance_id: str = None,
model_provider: str = None,
model_name: str = None,
algorithm_id: uuid.UUID = None,
metadata: Dict[str, Any] = None
) -> Session:
"""Crea una nueva sesión"""
with self.get_cursor() as cur:
cur.execute(
"""
SELECT create_session(%s, %s, %s, %s, %s, %s) as id
""",
(user_id, instance_id, model_provider, model_name,
str(algorithm_id) if algorithm_id else None,
Json(metadata or {}))
)
session_id = cur.fetchone()["id"]
return Session(
id=session_id,
user_id=user_id,
instance_id=instance_id,
model_provider=model_provider,
model_name=model_name,
algorithm_id=algorithm_id,
metadata=metadata or {}
)
def get_session(self, session_id: uuid.UUID) -> Optional[Session]:
"""Obtiene una sesión por ID"""
with self.get_cursor() as cur:
cur.execute(
"SELECT * FROM sessions WHERE id = %s",
(str(session_id),)
)
row = cur.fetchone()
if row:
return Session(
id=row["id"],
user_id=row["user_id"],
instance_id=row["instance_id"],
model_provider=row["initial_model_provider"],
model_name=row["initial_model_name"],
algorithm_id=row["initial_context_algorithm_id"],
metadata=row["metadata"] or {},
started_at=row["started_at"],
ended_at=row["ended_at"],
total_messages=row["total_messages"],
total_tokens_input=row["total_tokens_input"],
total_tokens_output=row["total_tokens_output"]
)
return None
# ==========================================
# LOG INMUTABLE
# ==========================================
def insert_log_entry(
self,
session_id: uuid.UUID,
role: MessageRole,
content: str,
model_provider: str = None,
model_name: str = None,
model_params: Dict[str, Any] = None,
context_snapshot: Dict[str, Any] = None,
context_algorithm_id: uuid.UUID = None,
context_tokens_used: int = None,
tokens_input: int = None,
tokens_output: int = None,
latency_ms: int = None,
source_ip: str = None,
user_agent: str = None
) -> uuid.UUID:
"""Inserta una entrada en el log inmutable"""
with self.get_cursor() as cur:
cur.execute(
"""
SELECT insert_log_entry(
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
) as id
""",
(
str(session_id),
role.value,
content,
model_provider,
model_name,
Json(model_params or {}),
Json(context_snapshot) if context_snapshot else None,
str(context_algorithm_id) if context_algorithm_id else None,
context_tokens_used,
tokens_input,
tokens_output,
latency_ms,
source_ip,
user_agent
)
)
return cur.fetchone()["id"]
def get_session_history(
self,
session_id: uuid.UUID,
limit: int = None,
include_system: bool = False
) -> List[Message]:
"""Obtiene el historial de una sesión"""
with self.get_cursor() as cur:
query = """
SELECT * FROM immutable_log
WHERE session_id = %s
"""
params = [str(session_id)]
if not include_system:
query += " AND role != 'system'"
query += " ORDER BY sequence_num DESC"
if limit:
query += " LIMIT %s"
params.append(limit)
cur.execute(query, params)
rows = cur.fetchall()
messages = []
for row in reversed(rows):
messages.append(Message(
id=row["id"],
session_id=row["session_id"],
sequence_num=row["sequence_num"],
role=MessageRole(row["role"]),
content=row["content"],
hash=row["hash"],
hash_anterior=row["hash_anterior"],
model_provider=row["model_provider"],
model_name=row["model_name"],
model_params=row["model_params"] or {},
context_snapshot=row["context_snapshot"],
context_algorithm_id=row["context_algorithm_id"],
context_tokens_used=row["context_tokens_used"],
tokens_input=row["tokens_input"],
tokens_output=row["tokens_output"],
latency_ms=row["latency_ms"],
created_at=row["created_at"]
))
return messages
def verify_chain_integrity(self, session_id: uuid.UUID) -> Dict[str, Any]:
"""Verifica la integridad de la cadena de hashes"""
with self.get_cursor() as cur:
cur.execute(
"SELECT * FROM verify_chain_integrity(%s)",
(str(session_id),)
)
row = cur.fetchone()
return {
"is_valid": row["is_valid"],
"broken_at_sequence": row["broken_at_sequence"],
"expected_hash": row["expected_hash"],
"actual_hash": row["actual_hash"]
}
# ==========================================
# BLOQUES DE CONTEXTO
# ==========================================
def create_context_block(self, block: ContextBlock) -> uuid.UUID:
"""Crea un bloque de contexto"""
with self.get_cursor() as cur:
cur.execute(
"""
INSERT INTO context_blocks
(code, name, description, content, category, priority, scope, project_id, activation_rules, active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
block.code, block.name, block.description, block.content,
block.category, block.priority, block.scope,
str(block.project_id) if block.project_id else None,
Json(block.activation_rules), block.active
)
)
return cur.fetchone()["id"]
def get_active_context_blocks(
self,
category: str = None,
scope: str = None,
project_id: uuid.UUID = None
) -> List[ContextBlock]:
"""Obtiene bloques de contexto activos"""
with self.get_cursor() as cur:
query = "SELECT * FROM context_blocks WHERE active = true"
params = []
if category:
query += " AND category = %s"
params.append(category)
if scope:
query += " AND scope = %s"
params.append(scope)
if project_id:
query += " AND (project_id = %s OR project_id IS NULL)"
params.append(str(project_id))
query += " ORDER BY priority DESC"
cur.execute(query, params)
return [
ContextBlock(
id=row["id"],
code=row["code"],
name=row["name"],
description=row["description"],
content=row["content"],
content_hash=row["content_hash"],
category=row["category"],
priority=row["priority"],
tokens_estimated=row["tokens_estimated"],
scope=row["scope"],
project_id=row["project_id"],
activation_rules=row["activation_rules"] or {},
active=row["active"],
version=row["version"]
)
for row in cur.fetchall()
]
# ==========================================
# MEMORIA
# ==========================================
def get_memories(
self,
type: str = None,
min_importance: int = 0,
limit: int = 20
) -> List[Memory]:
"""Obtiene memorias activas"""
with self.get_cursor() as cur:
query = """
SELECT * FROM memory
WHERE active = true
AND importance >= %s
AND (expires_at IS NULL OR expires_at > NOW())
"""
params = [min_importance]
if type:
query += " AND type = %s"
params.append(type)
query += " ORDER BY importance DESC, last_used_at DESC NULLS LAST LIMIT %s"
params.append(limit)
cur.execute(query, params)
return [
Memory(
id=row["id"],
type=row["type"],
category=row["category"],
content=row["content"],
summary=row["summary"],
importance=row["importance"],
confidence=float(row["confidence"]) if row["confidence"] else 1.0,
uses=row["uses"],
last_used_at=row["last_used_at"],
verified=row["verified"]
)
for row in cur.fetchall()
]
def save_memory(self, memory: Memory) -> uuid.UUID:
"""Guarda una memoria"""
with self.get_cursor() as cur:
cur.execute(
"""
INSERT INTO memory
(type, category, content, summary, extracted_from_session, importance, confidence, expires_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
memory.type, memory.category, memory.content, memory.summary,
str(memory.extracted_from_session) if memory.extracted_from_session else None,
memory.importance, memory.confidence, memory.expires_at
)
)
return cur.fetchone()["id"]
# ==========================================
# CONOCIMIENTO
# ==========================================
def search_knowledge(
self,
keywords: List[str] = None,
category: str = None,
tags: List[str] = None,
limit: int = 5
) -> List[Knowledge]:
"""Busca en la base de conocimiento"""
with self.get_cursor() as cur:
query = "SELECT * FROM knowledge_base WHERE active = true"
params = []
if category:
query += " AND category = %s"
params.append(category)
if tags:
query += " AND tags && %s"
params.append(tags)
if keywords:
# Búsqueda simple por contenido
keyword_conditions = []
for kw in keywords:
keyword_conditions.append("content ILIKE %s")
params.append(f"%{kw}%")
query += f" AND ({' OR '.join(keyword_conditions)})"
query += " ORDER BY priority DESC, access_count DESC LIMIT %s"
params.append(limit)
cur.execute(query, params)
return [
Knowledge(
id=row["id"],
title=row["title"],
category=row["category"],
tags=row["tags"] or [],
content=row["content"],
tokens_estimated=row["tokens_estimated"],
priority=row["priority"],
access_count=row["access_count"]
)
for row in cur.fetchall()
]
# ==========================================
# CONTEXTO AMBIENTAL
# ==========================================
def get_latest_ambient_context(self) -> Optional[AmbientContext]:
"""Obtiene el contexto ambiental más reciente"""
with self.get_cursor() as cur:
cur.execute(
"""
SELECT * FROM ambient_context
WHERE expires_at > NOW()
ORDER BY captured_at DESC
LIMIT 1
"""
)
row = cur.fetchone()
if row:
return AmbientContext(
id=row["id"],
captured_at=row["captured_at"],
expires_at=row["expires_at"],
environment=row["environment"] or {},
system_state=row["system_state"] or {},
active_resources=row["active_resources"] or []
)
return None
def save_ambient_context(self, context: AmbientContext) -> int:
"""Guarda un snapshot de contexto ambiental"""
with self.get_cursor() as cur:
cur.execute(
"""
INSERT INTO ambient_context
(environment, system_state, active_resources, expires_at)
VALUES (%s, %s, %s, %s)
RETURNING id
""",
(
Json(context.environment),
Json(context.system_state),
Json(context.active_resources),
context.expires_at
)
)
return cur.fetchone()["id"]
# ==========================================
# ALGORITMOS
# ==========================================
def get_active_algorithm(self) -> Optional[Algorithm]:
"""Obtiene el algoritmo activo"""
with self.get_cursor() as cur:
cur.execute(
"""
SELECT * FROM context_algorithms
WHERE status = 'active'
ORDER BY activated_at DESC
LIMIT 1
"""
)
row = cur.fetchone()
if row:
return Algorithm(
id=row["id"],
code=row["code"],
name=row["name"],
description=row["description"],
version=row["version"],
status=AlgorithmStatus(row["status"]),
config=row["config"] or {},
selector_code=row["selector_code"],
times_used=row["times_used"],
avg_tokens_used=float(row["avg_tokens_used"]) if row["avg_tokens_used"] else None,
avg_relevance_score=float(row["avg_relevance_score"]) if row["avg_relevance_score"] else None,
avg_response_quality=float(row["avg_response_quality"]) if row["avg_response_quality"] else None,
parent_algorithm_id=row["parent_algorithm_id"],
activated_at=row["activated_at"]
)
return None
def get_algorithm(self, algorithm_id: uuid.UUID) -> Optional[Algorithm]:
"""Obtiene un algoritmo por ID"""
with self.get_cursor() as cur:
cur.execute(
"SELECT * FROM context_algorithms WHERE id = %s",
(str(algorithm_id),)
)
row = cur.fetchone()
if row:
return Algorithm(
id=row["id"],
code=row["code"],
name=row["name"],
description=row["description"],
version=row["version"],
status=AlgorithmStatus(row["status"]),
config=row["config"] or {},
selector_code=row["selector_code"],
times_used=row["times_used"]
)
return None
def fork_algorithm(
self,
source_id: uuid.UUID,
new_code: str,
new_name: str,
reason: str = None
) -> uuid.UUID:
"""Clona un algoritmo para experimentación"""
with self.get_cursor() as cur:
cur.execute(
"SELECT fork_algorithm(%s, %s, %s, %s) as id",
(str(source_id), new_code, new_name, reason)
)
return cur.fetchone()["id"]
def activate_algorithm(self, algorithm_id: uuid.UUID) -> bool:
"""Activa un algoritmo"""
with self.get_cursor() as cur:
cur.execute(
"SELECT activate_algorithm(%s) as success",
(str(algorithm_id),)
)
return cur.fetchone()["success"]
# ==========================================
# MÉTRICAS
# ==========================================
def record_metric(
self,
algorithm_id: uuid.UUID,
session_id: uuid.UUID,
log_entry_id: uuid.UUID,
tokens_budget: int,
tokens_used: int,
context_composition: Dict[str, Any],
latency_ms: int = None,
model_tokens_input: int = None,
model_tokens_output: int = None
) -> uuid.UUID:
"""Registra una métrica de uso"""
with self.get_cursor() as cur:
cur.execute(
"""
SELECT record_algorithm_metric(
%s, %s, %s, %s, %s, %s, %s, %s, %s
) as id
""",
(
str(algorithm_id), str(session_id), str(log_entry_id),
tokens_budget, tokens_used, Json(context_composition),
latency_ms, model_tokens_input, model_tokens_output
)
)
return cur.fetchone()["id"]
def update_metric_evaluation(
self,
metric_id: uuid.UUID,
relevance: float = None,
quality: float = None,
satisfaction: float = None,
method: str = "manual"
) -> bool:
"""Actualiza la evaluación de una métrica"""
with self.get_cursor() as cur:
cur.execute(
"SELECT update_metric_evaluation(%s, %s, %s, %s, %s) as success",
(str(metric_id), relevance, quality, satisfaction, method)
)
return cur.fetchone()["success"]
def get_algorithm_performance(self, algorithm_id: uuid.UUID = None) -> List[Dict[str, Any]]:
"""Obtiene estadísticas de rendimiento de algoritmos"""
with self.get_cursor() as cur:
query = "SELECT * FROM algorithm_performance"
params = []
if algorithm_id:
query += " WHERE id = %s"
params.append(str(algorithm_id))
cur.execute(query, params)
return [dict(row) for row in cur.fetchall()]

309
src/models.py Normal file
View File

@@ -0,0 +1,309 @@
"""
Modelos de datos para Context Manager
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict, Any
from enum import Enum
import uuid
import hashlib
import json
class MessageRole(Enum):
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
TOOL = "tool"
class ContextSource(Enum):
MEMORY = "memory"
KNOWLEDGE = "knowledge"
HISTORY = "history"
AMBIENT = "ambient"
DATASET = "dataset"
class AlgorithmStatus(Enum):
DRAFT = "draft"
TESTING = "testing"
ACTIVE = "active"
DEPRECATED = "deprecated"
@dataclass
class Session:
"""Sesión de conversación"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
user_id: Optional[str] = None
instance_id: Optional[str] = None
model_provider: Optional[str] = None
model_name: Optional[str] = None
algorithm_id: Optional[uuid.UUID] = None
metadata: Dict[str, Any] = field(default_factory=dict)
started_at: datetime = field(default_factory=datetime.now)
ended_at: Optional[datetime] = None
total_messages: int = 0
total_tokens_input: int = 0
total_tokens_output: int = 0
@property
def hash(self) -> str:
content = f"{self.id}{self.started_at.isoformat()}"
return hashlib.sha256(content.encode()).hexdigest()
@dataclass
class Message:
"""Mensaje en el log inmutable"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
session_id: uuid.UUID = None
sequence_num: int = 0
role: MessageRole = MessageRole.USER
content: str = ""
hash: str = ""
hash_anterior: Optional[str] = None
# Modelo
model_provider: Optional[str] = None
model_name: Optional[str] = None
model_params: Dict[str, Any] = field(default_factory=dict)
# Contexto
context_snapshot: Optional[Dict[str, Any]] = None
context_algorithm_id: Optional[uuid.UUID] = None
context_tokens_used: Optional[int] = None
# Respuesta
tokens_input: Optional[int] = None
tokens_output: Optional[int] = None
latency_ms: Optional[int] = None
# Metadata
created_at: datetime = field(default_factory=datetime.now)
source_ip: Optional[str] = None
user_agent: Optional[str] = None
def compute_hash(self) -> str:
"""Calcula el hash del mensaje (blockchain-style)"""
content = (
(self.hash_anterior or "") +
str(self.session_id) +
str(self.sequence_num) +
self.role.value +
self.content
)
return hashlib.sha256(content.encode()).hexdigest()
@dataclass
class ContextBlock:
"""Bloque de contexto reutilizable"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
code: str = ""
name: str = ""
description: Optional[str] = None
content: str = ""
content_hash: Optional[str] = None
category: str = "general"
priority: int = 50
tokens_estimated: int = 0
scope: str = "global"
project_id: Optional[uuid.UUID] = None
activation_rules: Dict[str, Any] = field(default_factory=dict)
active: bool = True
version: int = 1
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def __post_init__(self):
self.content_hash = hashlib.sha256(self.content.encode()).hexdigest()
self.tokens_estimated = len(self.content) // 4
@dataclass
class Memory:
"""Memoria a largo plazo"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
type: str = "fact"
category: Optional[str] = None
content: str = ""
summary: Optional[str] = None
content_hash: Optional[str] = None
extracted_from_session: Optional[uuid.UUID] = None
importance: int = 50
confidence: float = 1.0
uses: int = 0
last_used_at: Optional[datetime] = None
expires_at: Optional[datetime] = None
active: bool = True
verified: bool = False
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class Knowledge:
"""Base de conocimiento"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
title: str = ""
category: str = ""
tags: List[str] = field(default_factory=list)
content: str = ""
content_hash: Optional[str] = None
tokens_estimated: int = 0
source_type: Optional[str] = None
source_ref: Optional[str] = None
priority: int = 50
access_count: int = 0
active: bool = True
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class Algorithm:
"""Algoritmo de selección de contexto"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
code: str = ""
name: str = ""
description: Optional[str] = None
version: str = "1.0.0"
status: AlgorithmStatus = AlgorithmStatus.DRAFT
config: Dict[str, Any] = field(default_factory=lambda: {
"max_tokens": 4000,
"sources": {
"system_prompts": True,
"context_blocks": True,
"memory": True,
"knowledge": True,
"history": True,
"ambient": True
},
"weights": {
"priority": 0.4,
"relevance": 0.3,
"recency": 0.2,
"frequency": 0.1
},
"history_config": {
"max_messages": 20,
"summarize_after": 10,
"include_system": False
},
"memory_config": {
"max_items": 15,
"min_importance": 30
},
"knowledge_config": {
"max_items": 5,
"require_keyword_match": True
}
})
selector_code: Optional[str] = None
times_used: int = 0
avg_tokens_used: Optional[float] = None
avg_relevance_score: Optional[float] = None
avg_response_quality: Optional[float] = None
parent_algorithm_id: Optional[uuid.UUID] = None
fork_reason: Optional[str] = None
created_at: datetime = field(default_factory=datetime.now)
activated_at: Optional[datetime] = None
deprecated_at: Optional[datetime] = None
@dataclass
class AlgorithmMetric:
"""Métrica de rendimiento de algoritmo"""
id: uuid.UUID = field(default_factory=uuid.uuid4)
algorithm_id: uuid.UUID = None
session_id: Optional[uuid.UUID] = None
log_entry_id: Optional[uuid.UUID] = None
tokens_budget: int = 0
tokens_used: int = 0
token_efficiency: float = 0.0
context_composition: Dict[str, Any] = field(default_factory=dict)
latency_ms: Optional[int] = None
model_tokens_input: Optional[int] = None
model_tokens_output: Optional[int] = None
relevance_score: Optional[float] = None
response_quality: Optional[float] = None
user_satisfaction: Optional[float] = None
auto_evaluated: bool = False
evaluation_method: Optional[str] = None
recorded_at: datetime = field(default_factory=datetime.now)
@dataclass
class AmbientContext:
"""Contexto ambiental del sistema"""
id: int = 0
captured_at: datetime = field(default_factory=datetime.now)
expires_at: Optional[datetime] = None
environment: Dict[str, Any] = field(default_factory=dict)
system_state: Dict[str, Any] = field(default_factory=dict)
active_resources: List[Dict[str, Any]] = field(default_factory=list)
@dataclass
class ContextItem:
"""Item individual de contexto seleccionado"""
source: ContextSource
content: str
tokens: int
priority: int
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SelectedContext:
"""Contexto seleccionado para enviar al modelo"""
items: List[ContextItem] = field(default_factory=list)
total_tokens: int = 0
algorithm_id: Optional[uuid.UUID] = None
composition: Dict[str, Any] = field(default_factory=dict)
def to_messages(self) -> List[Dict[str, str]]:
"""Convierte el contexto a formato de mensajes para la API"""
messages = []
# System context primero
system_content = []
for item in self.items:
if item.source in [ContextSource.MEMORY, ContextSource.KNOWLEDGE, ContextSource.AMBIENT]:
system_content.append(item.content)
if system_content:
messages.append({
"role": "system",
"content": "\n\n".join(system_content)
})
# History messages
for item in self.items:
if item.source == ContextSource.HISTORY:
role = item.metadata.get("role", "user")
messages.append({
"role": role,
"content": item.content
})
return messages
def to_dict(self) -> Dict[str, Any]:
"""Serializa el contexto para snapshot"""
return {
"items": [
{
"source": item.source.value,
"content": item.content[:500] + "..." if len(item.content) > 500 else item.content,
"tokens": item.tokens,
"priority": item.priority,
"metadata": item.metadata
}
for item in self.items
],
"total_tokens": self.total_tokens,
"algorithm_id": str(self.algorithm_id) if self.algorithm_id else None,
"composition": self.composition
}

18
src/providers/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
"""
Adaptadores para proveedores de IA
Permite usar el Context Manager con cualquier modelo de IA.
"""
from .base import BaseProvider, ProviderResponse
from .anthropic import AnthropicProvider
from .openai import OpenAIProvider
from .ollama import OllamaProvider
__all__ = [
"BaseProvider",
"ProviderResponse",
"AnthropicProvider",
"OpenAIProvider",
"OllamaProvider",
]

110
src/providers/anthropic.py Normal file
View File

@@ -0,0 +1,110 @@
"""
Adaptador para Anthropic (Claude)
"""
import os
from typing import List, Dict, Any, Optional
from .base import BaseProvider, ProviderResponse
from ..models import SelectedContext, ContextSource
try:
import anthropic
HAS_ANTHROPIC = True
except ImportError:
HAS_ANTHROPIC = False
class AnthropicProvider(BaseProvider):
"""Proveedor para modelos de Anthropic (Claude)"""
provider_name = "anthropic"
def __init__(
self,
api_key: str = None,
model: str = "claude-sonnet-4-20250514",
max_tokens: int = 4096,
**kwargs
):
super().__init__(api_key=api_key, model=model, **kwargs)
if not HAS_ANTHROPIC:
raise ImportError("anthropic no está instalado. Ejecuta: pip install anthropic")
self.api_key = api_key or os.getenv("ANTHROPIC_API_KEY")
self.model = model
self.max_tokens = max_tokens
self.client = anthropic.Anthropic(api_key=self.api_key)
def format_context(self, context: SelectedContext) -> tuple:
"""
Formatea el contexto para la API de Anthropic.
Returns:
Tuple de (system_prompt, messages)
"""
system_parts = []
messages = []
for item in context.items:
if item.source in [ContextSource.MEMORY, ContextSource.KNOWLEDGE,
ContextSource.AMBIENT, ContextSource.DATASET]:
system_parts.append(item.content)
elif item.source == ContextSource.HISTORY:
role = item.metadata.get("role", "user")
messages.append({
"role": role,
"content": item.content
})
system_prompt = "\n\n".join(system_parts) if system_parts else None
return system_prompt, messages
def send_message(
self,
message: str,
context: SelectedContext = None,
system_prompt: str = None,
temperature: float = 1.0,
**kwargs
) -> ProviderResponse:
"""Envía mensaje a Claude"""
# Formatear contexto
context_system, context_messages = self.format_context(context) if context else (None, [])
# Combinar system prompts
final_system = ""
if system_prompt:
final_system = system_prompt
if context_system:
final_system = f"{final_system}\n\n{context_system}" if final_system else context_system
# Construir mensajes
messages = context_messages.copy()
messages.append({"role": "user", "content": message})
# Llamar a la API
response, latency_ms = self._measure_latency(
self.client.messages.create,
model=self.model,
max_tokens=kwargs.get("max_tokens", self.max_tokens),
system=final_system if final_system else anthropic.NOT_GIVEN,
messages=messages,
temperature=temperature
)
return ProviderResponse(
content=response.content[0].text,
model=response.model,
tokens_input=response.usage.input_tokens,
tokens_output=response.usage.output_tokens,
latency_ms=latency_ms,
finish_reason=response.stop_reason,
raw_response={
"id": response.id,
"type": response.type,
"role": response.role
}
)

85
src/providers/base.py Normal file
View File

@@ -0,0 +1,85 @@
"""
Clase base para proveedores de IA
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import time
from ..models import SelectedContext
@dataclass
class ProviderResponse:
"""Respuesta de un proveedor de IA"""
content: str
model: str
tokens_input: int = 0
tokens_output: int = 0
latency_ms: int = 0
finish_reason: str = "stop"
raw_response: Dict[str, Any] = field(default_factory=dict)
class BaseProvider(ABC):
"""
Clase base para adaptadores de proveedores de IA.
Cada proveedor debe implementar:
- send_message(): Enviar mensaje y recibir respuesta
- format_context(): Formatear contexto al formato del proveedor
"""
provider_name: str = "base"
def __init__(self, api_key: str = None, model: str = None, **kwargs):
self.api_key = api_key
self.model = model
self.extra_config = kwargs
@abstractmethod
def send_message(
self,
message: str,
context: SelectedContext = None,
system_prompt: str = None,
**kwargs
) -> ProviderResponse:
"""
Envía un mensaje al modelo y retorna la respuesta.
Args:
message: Mensaje del usuario
context: Contexto seleccionado
system_prompt: Prompt de sistema adicional
**kwargs: Parámetros adicionales del modelo
Returns:
ProviderResponse con la respuesta
"""
pass
@abstractmethod
def format_context(self, context: SelectedContext) -> List[Dict[str, str]]:
"""
Formatea el contexto al formato de mensajes del proveedor.
Args:
context: Contexto seleccionado
Returns:
Lista de mensajes en el formato del proveedor
"""
pass
def estimate_tokens(self, text: str) -> int:
"""Estimación simple de tokens (4 caracteres por token)"""
return len(text) // 4
def _measure_latency(self, func, *args, **kwargs):
"""Mide la latencia de una función"""
start = time.time()
result = func(*args, **kwargs)
latency_ms = int((time.time() - start) * 1000)
return result, latency_ms

141
src/providers/ollama.py Normal file
View File

@@ -0,0 +1,141 @@
"""
Adaptador para Ollama (modelos locales)
"""
import os
import requests
from typing import List, Dict, Any, Optional
from .base import BaseProvider, ProviderResponse
from ..models import SelectedContext, ContextSource
class OllamaProvider(BaseProvider):
"""Proveedor para modelos locales via Ollama"""
provider_name = "ollama"
def __init__(
self,
model: str = "llama3",
host: str = None,
port: int = None,
**kwargs
):
super().__init__(model=model, **kwargs)
self.host = host or os.getenv("OLLAMA_HOST", "localhost")
self.port = port or int(os.getenv("OLLAMA_PORT", "11434"))
self.model = model
self.base_url = f"http://{self.host}:{self.port}"
def format_context(self, context: SelectedContext) -> List[Dict[str, str]]:
"""
Formatea el contexto para Ollama.
Returns:
Lista de mensajes en formato Ollama
"""
messages = []
system_parts = []
for item in context.items:
if item.source in [ContextSource.MEMORY, ContextSource.KNOWLEDGE,
ContextSource.AMBIENT, ContextSource.DATASET]:
system_parts.append(item.content)
elif item.source == ContextSource.HISTORY:
role = item.metadata.get("role", "user")
messages.append({
"role": role,
"content": item.content
})
if system_parts:
messages.insert(0, {
"role": "system",
"content": "\n\n".join(system_parts)
})
return messages
def send_message(
self,
message: str,
context: SelectedContext = None,
system_prompt: str = None,
temperature: float = 0.7,
**kwargs
) -> ProviderResponse:
"""Envía mensaje a Ollama"""
# Formatear contexto
messages = self.format_context(context) if context else []
# Añadir system prompt
if system_prompt:
if messages and messages[0]["role"] == "system":
messages[0]["content"] = f"{system_prompt}\n\n{messages[0]['content']}"
else:
messages.insert(0, {"role": "system", "content": system_prompt})
# Añadir mensaje del usuario
messages.append({"role": "user", "content": message})
# Llamar a la API
url = f"{self.base_url}/api/chat"
payload = {
"model": self.model,
"messages": messages,
"stream": False,
"options": {
"temperature": temperature
}
}
response, latency_ms = self._measure_latency(
requests.post,
url,
json=payload,
timeout=120
)
response.raise_for_status()
data = response.json()
# Ollama no siempre retorna conteos de tokens
tokens_input = data.get("prompt_eval_count", self.estimate_tokens(str(messages)))
tokens_output = data.get("eval_count", self.estimate_tokens(data["message"]["content"]))
return ProviderResponse(
content=data["message"]["content"],
model=data.get("model", self.model),
tokens_input=tokens_input,
tokens_output=tokens_output,
latency_ms=latency_ms,
finish_reason=data.get("done_reason", "stop"),
raw_response={
"total_duration": data.get("total_duration"),
"load_duration": data.get("load_duration"),
"prompt_eval_duration": data.get("prompt_eval_duration"),
"eval_duration": data.get("eval_duration")
}
)
def list_models(self) -> List[str]:
"""Lista los modelos disponibles en Ollama"""
response = requests.get(f"{self.base_url}/api/tags")
response.raise_for_status()
data = response.json()
return [m["name"] for m in data.get("models", [])]
def pull_model(self, model_name: str):
"""Descarga un modelo en Ollama"""
response = requests.post(
f"{self.base_url}/api/pull",
json={"name": model_name},
stream=True
)
response.raise_for_status()
for line in response.iter_lines():
if line:
yield line.decode("utf-8")

120
src/providers/openai.py Normal file
View File

@@ -0,0 +1,120 @@
"""
Adaptador para OpenAI (GPT)
"""
import os
from typing import List, Dict, Any, Optional
from .base import BaseProvider, ProviderResponse
from ..models import SelectedContext, ContextSource
try:
import openai
HAS_OPENAI = True
except ImportError:
HAS_OPENAI = False
class OpenAIProvider(BaseProvider):
"""Proveedor para modelos de OpenAI (GPT)"""
provider_name = "openai"
def __init__(
self,
api_key: str = None,
model: str = "gpt-4",
max_tokens: int = 4096,
base_url: str = None,
**kwargs
):
super().__init__(api_key=api_key, model=model, **kwargs)
if not HAS_OPENAI:
raise ImportError("openai no está instalado. Ejecuta: pip install openai")
self.api_key = api_key or os.getenv("OPENAI_API_KEY")
self.model = model
self.max_tokens = max_tokens
self.client = openai.OpenAI(
api_key=self.api_key,
base_url=base_url
)
def format_context(self, context: SelectedContext) -> List[Dict[str, str]]:
"""
Formatea el contexto para la API de OpenAI.
Returns:
Lista de mensajes en formato OpenAI
"""
messages = []
system_parts = []
for item in context.items:
if item.source in [ContextSource.MEMORY, ContextSource.KNOWLEDGE,
ContextSource.AMBIENT, ContextSource.DATASET]:
system_parts.append(item.content)
elif item.source == ContextSource.HISTORY:
role = item.metadata.get("role", "user")
messages.append({
"role": role,
"content": item.content
})
# Insertar system message al inicio
if system_parts:
messages.insert(0, {
"role": "system",
"content": "\n\n".join(system_parts)
})
return messages
def send_message(
self,
message: str,
context: SelectedContext = None,
system_prompt: str = None,
temperature: float = 1.0,
**kwargs
) -> ProviderResponse:
"""Envía mensaje a GPT"""
# Formatear contexto
messages = self.format_context(context) if context else []
# Añadir system prompt adicional
if system_prompt:
if messages and messages[0]["role"] == "system":
messages[0]["content"] = f"{system_prompt}\n\n{messages[0]['content']}"
else:
messages.insert(0, {"role": "system", "content": system_prompt})
# Añadir mensaje del usuario
messages.append({"role": "user", "content": message})
# Llamar a la API
response, latency_ms = self._measure_latency(
self.client.chat.completions.create,
model=self.model,
messages=messages,
max_tokens=kwargs.get("max_tokens", self.max_tokens),
temperature=temperature
)
choice = response.choices[0]
return ProviderResponse(
content=choice.message.content,
model=response.model,
tokens_input=response.usage.prompt_tokens,
tokens_output=response.usage.completion_tokens,
latency_ms=latency_ms,
finish_reason=choice.finish_reason,
raw_response={
"id": response.id,
"created": response.created,
"system_fingerprint": response.system_fingerprint
}
)