Files
captain-claude/apps/storage/storage_worker.py
ARCHITECT 9b244138b5 Add pending apps and frontend components
- apps/captain-mobile: Mobile API service
- apps/flow-ui: Flow UI application
- apps/mindlink: Mindlink application
- apps/storage: Storage API and workers
- apps/tzzr-cli: TZZR CLI tool
- deck-frontend/backups: Historical TypeScript versions
- hst-frontend: Standalone HST frontend

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 18:26:59 +00:00

481 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Storage Worker - Verificación y procesamiento de archivos
Spec: Sistema de Almacenamiento Híbrido v4.0
"""
import os
import hashlib
import json
import asyncio
import asyncpg
from datetime import datetime
from typing import Optional, Dict, Any
import boto3
from PIL import Image
import fitz # PyMuPDF
import io
import tempfile
# Configuración
R2_ENDPOINT = os.environ.get("R2_ENDPOINT", "https://7dedae6030f5554d99d37e98a5232996.r2.cloudflarestorage.com")
R2_BUCKET = os.environ.get("R2_BUCKET", "deck")
DB_URL = os.environ.get("DATABASE_URL", "postgresql://tzzr:tzzr@localhost:5432/tzzr")
THUMB_WIDTH = 300
MAX_RETRIES = 9
RETRY_BACKOFF_BASE = 2
def get_s3_client():
return boto3.client(
"s3",
endpoint_url=R2_ENDPOINT,
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
)
async def get_db_pool():
return await asyncpg.create_pool(DB_URL, min_size=2, max_size=10)
def calculate_sha256(data: bytes) -> str:
"""Calcula SHA-256 de bytes"""
return hashlib.sha256(data).hexdigest()
def generate_public_key(content_hash: str, user_id: str) -> str:
"""Genera public_key única para un asset"""
data = f"{content_hash}{user_id}{datetime.now().isoformat()}"
return hashlib.sha256(data.encode()).hexdigest()
class StorageWorker:
def __init__(self):
self.s3 = get_s3_client()
self.pool = None
async def init(self):
self.pool = await get_db_pool()
async def close(self):
if self.pool:
await self.pool.close()
# =========================================================================
# VERIFICACIÓN DE HASH
# =========================================================================
async def verify_blob(self, declared_hash: str, storage_path: str) -> Dict[str, Any]:
"""
Verifica que el hash declarado coincida con el contenido real.
NUNCA confiamos en el hash del cliente.
"""
try:
# Descargar archivo
obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path)
content = obj["Body"].read()
# Calcular hash real
calculated_hash = calculate_sha256(content)
if calculated_hash != declared_hash:
# HASH MISMATCH - Archivo corrupto o spoofing
await self._mark_corrupt(declared_hash, storage_path)
return {
"status": "CORRUPT",
"declared": declared_hash,
"calculated": calculated_hash,
"action": "deleted"
}
# Hash coincide - Marcar como verificado
await self._mark_verified(declared_hash)
return {
"status": "VERIFIED",
"hash": declared_hash,
"size": len(content)
}
except Exception as e:
return {"status": "ERROR", "error": str(e)}
async def _mark_corrupt(self, content_hash: str, storage_path: str):
"""Marca blob como corrupto y elimina archivo"""
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE storage.physical_blobs
SET verification_status = 'CORRUPT', updated_at = NOW()
WHERE content_hash = $1
""", content_hash)
# Eliminar archivo del bucket
try:
self.s3.delete_object(Bucket=R2_BUCKET, Key=storage_path)
except:
pass
async def _mark_verified(self, content_hash: str):
"""Marca blob como verificado"""
async with self.pool.acquire() as conn:
await conn.execute("""
UPDATE storage.physical_blobs
SET verification_status = 'VERIFIED',
last_verified_at = NOW(),
updated_at = NOW()
WHERE content_hash = $1
""", content_hash)
# =========================================================================
# GENERACIÓN DE DERIVADOS
# =========================================================================
async def generate_derivatives(self, content_hash: str) -> Dict[str, Any]:
"""Genera thumbnail y metadatos para un blob verificado"""
async with self.pool.acquire() as conn:
blob = await conn.fetchrow("""
SELECT content_hash, mime_type, storage_path, file_size
FROM storage.physical_blobs
WHERE content_hash = $1 AND verification_status = 'VERIFIED'
""", content_hash)
if not blob:
return {"status": "ERROR", "error": "Blob not found or not verified"}
mime_type = blob["mime_type"]
storage_path = blob["storage_path"]
# Descargar archivo
obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path)
content = obj["Body"].read()
metadata = {
"content_hash": content_hash,
"mime_type": mime_type,
"file_size": blob["file_size"],
"processed_at": datetime.now().isoformat()
}
thumb_generated = False
# Generar thumbnail según tipo
if mime_type.startswith("image/"):
thumb_data, extra_meta = self._process_image(content)
metadata.update(extra_meta)
if thumb_data:
await self._save_thumb(content_hash, thumb_data)
thumb_generated = True
elif mime_type == "application/pdf":
thumb_data, extra_meta = self._process_pdf(content)
metadata.update(extra_meta)
if thumb_data:
await self._save_thumb(content_hash, thumb_data)
thumb_generated = True
# Guardar metadatos
await self._save_metadata(content_hash, metadata)
return {
"status": "OK",
"thumb_generated": thumb_generated,
"metadata": metadata
}
def _process_image(self, content: bytes) -> tuple:
"""Procesa imagen: genera thumb y extrae metadatos"""
try:
img = Image.open(io.BytesIO(content))
# Metadatos
meta = {
"width": img.width,
"height": img.height,
"format": img.format,
"mode": img.mode
}
# EXIF si disponible
if hasattr(img, '_getexif') and img._getexif():
meta["has_exif"] = True
# Generar thumbnail
ratio = THUMB_WIDTH / img.width
new_height = int(img.height * ratio)
thumb = img.copy()
thumb.thumbnail((THUMB_WIDTH, new_height), Image.Resampling.LANCZOS)
# Convertir a bytes
thumb_buffer = io.BytesIO()
thumb.save(thumb_buffer, format="JPEG", quality=85)
thumb_data = thumb_buffer.getvalue()
return thumb_data, meta
except Exception as e:
return None, {"error": str(e)}
def _process_pdf(self, content: bytes) -> tuple:
"""Procesa PDF: genera thumb de primera página y extrae metadatos"""
try:
doc = fitz.open(stream=content, filetype="pdf")
meta = {
"pages": len(doc),
"format": "PDF"
}
# Metadatos del documento
pdf_meta = doc.metadata
if pdf_meta:
if pdf_meta.get("author"):
meta["author"] = pdf_meta["author"]
if pdf_meta.get("title"):
meta["title"] = pdf_meta["title"]
# Render primera página como thumbnail
if len(doc) > 0:
page = doc[0]
# Escalar para que el ancho sea THUMB_WIDTH
zoom = THUMB_WIDTH / page.rect.width
mat = fitz.Matrix(zoom, zoom)
pix = page.get_pixmap(matrix=mat)
thumb_data = pix.tobytes("jpeg")
else:
thumb_data = None
doc.close()
return thumb_data, meta
except Exception as e:
return None, {"error": str(e)}
async def _save_thumb(self, content_hash: str, thumb_data: bytes):
"""Guarda thumbnail en el bucket"""
key = f"{content_hash}.thumb"
self.s3.put_object(
Bucket=R2_BUCKET,
Key=key,
Body=thumb_data,
ContentType="image/jpeg"
)
async def _save_metadata(self, content_hash: str, metadata: dict):
"""Guarda metadatos JSON en el bucket"""
key = f"{content_hash}.json"
self.s3.put_object(
Bucket=R2_BUCKET,
Key=key,
Body=json.dumps(metadata, indent=2),
ContentType="application/json"
)
# =========================================================================
# PROCESAMIENTO COMPLETO
# =========================================================================
async def process_upload(
self,
declared_hash: str,
storage_path: str,
user_id: str,
original_filename: str,
access_password: Optional[str] = None
) -> Dict[str, Any]:
"""
Proceso completo post-upload:
1. Verificar hash
2. Generar derivados
3. Crear user_asset
"""
# 1. Verificar hash
verify_result = await self.verify_blob(declared_hash, storage_path)
if verify_result["status"] != "VERIFIED":
return verify_result
# 2. Generar derivados (con reintentos)
for attempt in range(MAX_RETRIES):
try:
deriv_result = await self.generate_derivatives(declared_hash)
if deriv_result["status"] == "OK":
break
except Exception as e:
if attempt == MAX_RETRIES - 1:
# Último intento fallido, pero blob ya está verificado
deriv_result = {"status": "PARTIAL", "error": str(e)}
else:
await asyncio.sleep(RETRY_BACKOFF_BASE ** attempt)
# 3. Crear user_asset
public_key = generate_public_key(declared_hash, user_id)
async with self.pool.acquire() as conn:
await conn.execute("""
INSERT INTO storage.user_assets
(public_key, blob_hash, user_id, original_filename, access_password)
VALUES ($1, $2, $3, $4, $5)
""", public_key, declared_hash, user_id, original_filename, access_password)
return {
"status": "CREATED",
"public_key": public_key,
"content_hash": declared_hash,
"derivatives": deriv_result
}
# =========================================================================
# REGISTRO DE BLOB (sin subida - para archivos existentes)
# =========================================================================
async def register_blob(
self,
content_hash: str,
file_size: int,
mime_type: str,
storage_provider: str,
storage_path: str
) -> Dict[str, Any]:
"""Registra un blob existente en el sistema"""
async with self.pool.acquire() as conn:
# Verificar si ya existe
existing = await conn.fetchrow("""
SELECT content_hash, verification_status
FROM storage.physical_blobs
WHERE content_hash = $1
""", content_hash)
if existing:
return {
"status": "EXISTS",
"content_hash": content_hash,
"verification_status": existing["verification_status"]
}
# Insertar nuevo blob
await conn.execute("""
INSERT INTO storage.physical_blobs
(content_hash, file_size, mime_type, storage_provider, storage_path)
VALUES ($1, $2, $3, $4::storage.storage_provider_enum, $5)
""", content_hash, file_size, mime_type, storage_provider, storage_path)
return {
"status": "REGISTERED",
"content_hash": content_hash,
"verification_status": "PENDING"
}
# =========================================================================
# MANTENIMIENTO
# =========================================================================
async def garbage_collect(self, dry_run: bool = True) -> Dict[str, Any]:
"""
Elimina blobs huérfanos (ref_count = 0, sin actualizar en 30 días)
"""
async with self.pool.acquire() as conn:
orphans = await conn.fetch("""
SELECT content_hash, storage_path
FROM storage.physical_blobs
WHERE ref_count = 0
AND updated_at < NOW() - INTERVAL '30 days'
""")
deleted = []
for blob in orphans:
if not dry_run:
# Eliminar derivados
for ext in [".thumb", ".json"]:
try:
self.s3.delete_object(Bucket=R2_BUCKET, Key=f"{blob['content_hash']}{ext}")
except:
pass
# Eliminar blob
try:
self.s3.delete_object(Bucket=R2_BUCKET, Key=blob["storage_path"])
except:
pass
# Eliminar registro
async with self.pool.acquire() as conn:
await conn.execute("""
DELETE FROM storage.physical_blobs WHERE content_hash = $1
""", blob["content_hash"])
deleted.append(blob["content_hash"])
return {
"status": "OK",
"dry_run": dry_run,
"orphans_found": len(orphans),
"deleted": deleted if not dry_run else []
}
async def integrity_check(self, sample_percent: float = 0.01) -> Dict[str, Any]:
"""
Verifica integridad de una muestra aleatoria de blobs
"""
async with self.pool.acquire() as conn:
blobs = await conn.fetch("""
SELECT content_hash, storage_path
FROM storage.physical_blobs
WHERE verification_status = 'VERIFIED'
ORDER BY RANDOM()
LIMIT (SELECT CEIL(COUNT(*) * $1) FROM storage.physical_blobs WHERE verification_status = 'VERIFIED')
""", sample_percent)
results = {"checked": 0, "ok": 0, "corrupt": []}
for blob in blobs:
results["checked"] += 1
verify = await self.verify_blob(blob["content_hash"], blob["storage_path"])
if verify["status"] == "VERIFIED":
results["ok"] += 1
else:
results["corrupt"].append(blob["content_hash"])
return results
# CLI para pruebas
async def main():
import sys
worker = StorageWorker()
await worker.init()
if len(sys.argv) < 2:
print("Usage: storage_worker.py <command> [args]")
print("Commands: gc, integrity, register")
return
cmd = sys.argv[1]
if cmd == "gc":
dry_run = "--execute" not in sys.argv
result = await worker.garbage_collect(dry_run=dry_run)
print(json.dumps(result, indent=2))
elif cmd == "integrity":
result = await worker.integrity_check()
print(json.dumps(result, indent=2))
elif cmd == "register":
if len(sys.argv) < 6:
print("Usage: storage_worker.py register <hash> <size> <mime> <path>")
return
result = await worker.register_blob(
sys.argv[2], int(sys.argv[3]), sys.argv[4], "R2_PRIMARY", sys.argv[5]
)
print(json.dumps(result, indent=2))
await worker.close()
if __name__ == "__main__":
asyncio.run(main())