Add pending apps and frontend components
- apps/captain-mobile: Mobile API service - apps/flow-ui: Flow UI application - apps/mindlink: Mindlink application - apps/storage: Storage API and workers - apps/tzzr-cli: TZZR CLI tool - deck-frontend/backups: Historical TypeScript versions - hst-frontend: Standalone HST frontend Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
480
apps/storage/storage_worker.py
Normal file
480
apps/storage/storage_worker.py
Normal file
@@ -0,0 +1,480 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Storage Worker - Verificación y procesamiento de archivos
|
||||
Spec: Sistema de Almacenamiento Híbrido v4.0
|
||||
"""
|
||||
|
||||
import os
|
||||
import hashlib
|
||||
import json
|
||||
import asyncio
|
||||
import asyncpg
|
||||
from datetime import datetime
|
||||
from typing import Optional, Dict, Any
|
||||
import boto3
|
||||
from PIL import Image
|
||||
import fitz # PyMuPDF
|
||||
import io
|
||||
import tempfile
|
||||
|
||||
# Configuración
|
||||
R2_ENDPOINT = os.environ.get("R2_ENDPOINT", "https://7dedae6030f5554d99d37e98a5232996.r2.cloudflarestorage.com")
|
||||
R2_BUCKET = os.environ.get("R2_BUCKET", "deck")
|
||||
DB_URL = os.environ.get("DATABASE_URL", "postgresql://tzzr:tzzr@localhost:5432/tzzr")
|
||||
|
||||
THUMB_WIDTH = 300
|
||||
MAX_RETRIES = 9
|
||||
RETRY_BACKOFF_BASE = 2
|
||||
|
||||
|
||||
def get_s3_client():
|
||||
return boto3.client(
|
||||
"s3",
|
||||
endpoint_url=R2_ENDPOINT,
|
||||
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
|
||||
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
|
||||
)
|
||||
|
||||
|
||||
async def get_db_pool():
|
||||
return await asyncpg.create_pool(DB_URL, min_size=2, max_size=10)
|
||||
|
||||
|
||||
def calculate_sha256(data: bytes) -> str:
|
||||
"""Calcula SHA-256 de bytes"""
|
||||
return hashlib.sha256(data).hexdigest()
|
||||
|
||||
|
||||
def generate_public_key(content_hash: str, user_id: str) -> str:
|
||||
"""Genera public_key única para un asset"""
|
||||
data = f"{content_hash}{user_id}{datetime.now().isoformat()}"
|
||||
return hashlib.sha256(data.encode()).hexdigest()
|
||||
|
||||
|
||||
class StorageWorker:
|
||||
def __init__(self):
|
||||
self.s3 = get_s3_client()
|
||||
self.pool = None
|
||||
|
||||
async def init(self):
|
||||
self.pool = await get_db_pool()
|
||||
|
||||
async def close(self):
|
||||
if self.pool:
|
||||
await self.pool.close()
|
||||
|
||||
# =========================================================================
|
||||
# VERIFICACIÓN DE HASH
|
||||
# =========================================================================
|
||||
|
||||
async def verify_blob(self, declared_hash: str, storage_path: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Verifica que el hash declarado coincida con el contenido real.
|
||||
NUNCA confiamos en el hash del cliente.
|
||||
"""
|
||||
try:
|
||||
# Descargar archivo
|
||||
obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path)
|
||||
content = obj["Body"].read()
|
||||
|
||||
# Calcular hash real
|
||||
calculated_hash = calculate_sha256(content)
|
||||
|
||||
if calculated_hash != declared_hash:
|
||||
# HASH MISMATCH - Archivo corrupto o spoofing
|
||||
await self._mark_corrupt(declared_hash, storage_path)
|
||||
return {
|
||||
"status": "CORRUPT",
|
||||
"declared": declared_hash,
|
||||
"calculated": calculated_hash,
|
||||
"action": "deleted"
|
||||
}
|
||||
|
||||
# Hash coincide - Marcar como verificado
|
||||
await self._mark_verified(declared_hash)
|
||||
|
||||
return {
|
||||
"status": "VERIFIED",
|
||||
"hash": declared_hash,
|
||||
"size": len(content)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"status": "ERROR", "error": str(e)}
|
||||
|
||||
async def _mark_corrupt(self, content_hash: str, storage_path: str):
|
||||
"""Marca blob como corrupto y elimina archivo"""
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
UPDATE storage.physical_blobs
|
||||
SET verification_status = 'CORRUPT', updated_at = NOW()
|
||||
WHERE content_hash = $1
|
||||
""", content_hash)
|
||||
|
||||
# Eliminar archivo del bucket
|
||||
try:
|
||||
self.s3.delete_object(Bucket=R2_BUCKET, Key=storage_path)
|
||||
except:
|
||||
pass
|
||||
|
||||
async def _mark_verified(self, content_hash: str):
|
||||
"""Marca blob como verificado"""
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
UPDATE storage.physical_blobs
|
||||
SET verification_status = 'VERIFIED',
|
||||
last_verified_at = NOW(),
|
||||
updated_at = NOW()
|
||||
WHERE content_hash = $1
|
||||
""", content_hash)
|
||||
|
||||
# =========================================================================
|
||||
# GENERACIÓN DE DERIVADOS
|
||||
# =========================================================================
|
||||
|
||||
async def generate_derivatives(self, content_hash: str) -> Dict[str, Any]:
|
||||
"""Genera thumbnail y metadatos para un blob verificado"""
|
||||
async with self.pool.acquire() as conn:
|
||||
blob = await conn.fetchrow("""
|
||||
SELECT content_hash, mime_type, storage_path, file_size
|
||||
FROM storage.physical_blobs
|
||||
WHERE content_hash = $1 AND verification_status = 'VERIFIED'
|
||||
""", content_hash)
|
||||
|
||||
if not blob:
|
||||
return {"status": "ERROR", "error": "Blob not found or not verified"}
|
||||
|
||||
mime_type = blob["mime_type"]
|
||||
storage_path = blob["storage_path"]
|
||||
|
||||
# Descargar archivo
|
||||
obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path)
|
||||
content = obj["Body"].read()
|
||||
|
||||
metadata = {
|
||||
"content_hash": content_hash,
|
||||
"mime_type": mime_type,
|
||||
"file_size": blob["file_size"],
|
||||
"processed_at": datetime.now().isoformat()
|
||||
}
|
||||
|
||||
thumb_generated = False
|
||||
|
||||
# Generar thumbnail según tipo
|
||||
if mime_type.startswith("image/"):
|
||||
thumb_data, extra_meta = self._process_image(content)
|
||||
metadata.update(extra_meta)
|
||||
if thumb_data:
|
||||
await self._save_thumb(content_hash, thumb_data)
|
||||
thumb_generated = True
|
||||
|
||||
elif mime_type == "application/pdf":
|
||||
thumb_data, extra_meta = self._process_pdf(content)
|
||||
metadata.update(extra_meta)
|
||||
if thumb_data:
|
||||
await self._save_thumb(content_hash, thumb_data)
|
||||
thumb_generated = True
|
||||
|
||||
# Guardar metadatos
|
||||
await self._save_metadata(content_hash, metadata)
|
||||
|
||||
return {
|
||||
"status": "OK",
|
||||
"thumb_generated": thumb_generated,
|
||||
"metadata": metadata
|
||||
}
|
||||
|
||||
def _process_image(self, content: bytes) -> tuple:
|
||||
"""Procesa imagen: genera thumb y extrae metadatos"""
|
||||
try:
|
||||
img = Image.open(io.BytesIO(content))
|
||||
|
||||
# Metadatos
|
||||
meta = {
|
||||
"width": img.width,
|
||||
"height": img.height,
|
||||
"format": img.format,
|
||||
"mode": img.mode
|
||||
}
|
||||
|
||||
# EXIF si disponible
|
||||
if hasattr(img, '_getexif') and img._getexif():
|
||||
meta["has_exif"] = True
|
||||
|
||||
# Generar thumbnail
|
||||
ratio = THUMB_WIDTH / img.width
|
||||
new_height = int(img.height * ratio)
|
||||
thumb = img.copy()
|
||||
thumb.thumbnail((THUMB_WIDTH, new_height), Image.Resampling.LANCZOS)
|
||||
|
||||
# Convertir a bytes
|
||||
thumb_buffer = io.BytesIO()
|
||||
thumb.save(thumb_buffer, format="JPEG", quality=85)
|
||||
thumb_data = thumb_buffer.getvalue()
|
||||
|
||||
return thumb_data, meta
|
||||
|
||||
except Exception as e:
|
||||
return None, {"error": str(e)}
|
||||
|
||||
def _process_pdf(self, content: bytes) -> tuple:
|
||||
"""Procesa PDF: genera thumb de primera página y extrae metadatos"""
|
||||
try:
|
||||
doc = fitz.open(stream=content, filetype="pdf")
|
||||
|
||||
meta = {
|
||||
"pages": len(doc),
|
||||
"format": "PDF"
|
||||
}
|
||||
|
||||
# Metadatos del documento
|
||||
pdf_meta = doc.metadata
|
||||
if pdf_meta:
|
||||
if pdf_meta.get("author"):
|
||||
meta["author"] = pdf_meta["author"]
|
||||
if pdf_meta.get("title"):
|
||||
meta["title"] = pdf_meta["title"]
|
||||
|
||||
# Render primera página como thumbnail
|
||||
if len(doc) > 0:
|
||||
page = doc[0]
|
||||
# Escalar para que el ancho sea THUMB_WIDTH
|
||||
zoom = THUMB_WIDTH / page.rect.width
|
||||
mat = fitz.Matrix(zoom, zoom)
|
||||
pix = page.get_pixmap(matrix=mat)
|
||||
thumb_data = pix.tobytes("jpeg")
|
||||
else:
|
||||
thumb_data = None
|
||||
|
||||
doc.close()
|
||||
return thumb_data, meta
|
||||
|
||||
except Exception as e:
|
||||
return None, {"error": str(e)}
|
||||
|
||||
async def _save_thumb(self, content_hash: str, thumb_data: bytes):
|
||||
"""Guarda thumbnail en el bucket"""
|
||||
key = f"{content_hash}.thumb"
|
||||
self.s3.put_object(
|
||||
Bucket=R2_BUCKET,
|
||||
Key=key,
|
||||
Body=thumb_data,
|
||||
ContentType="image/jpeg"
|
||||
)
|
||||
|
||||
async def _save_metadata(self, content_hash: str, metadata: dict):
|
||||
"""Guarda metadatos JSON en el bucket"""
|
||||
key = f"{content_hash}.json"
|
||||
self.s3.put_object(
|
||||
Bucket=R2_BUCKET,
|
||||
Key=key,
|
||||
Body=json.dumps(metadata, indent=2),
|
||||
ContentType="application/json"
|
||||
)
|
||||
|
||||
# =========================================================================
|
||||
# PROCESAMIENTO COMPLETO
|
||||
# =========================================================================
|
||||
|
||||
async def process_upload(
|
||||
self,
|
||||
declared_hash: str,
|
||||
storage_path: str,
|
||||
user_id: str,
|
||||
original_filename: str,
|
||||
access_password: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Proceso completo post-upload:
|
||||
1. Verificar hash
|
||||
2. Generar derivados
|
||||
3. Crear user_asset
|
||||
"""
|
||||
# 1. Verificar hash
|
||||
verify_result = await self.verify_blob(declared_hash, storage_path)
|
||||
|
||||
if verify_result["status"] != "VERIFIED":
|
||||
return verify_result
|
||||
|
||||
# 2. Generar derivados (con reintentos)
|
||||
for attempt in range(MAX_RETRIES):
|
||||
try:
|
||||
deriv_result = await self.generate_derivatives(declared_hash)
|
||||
if deriv_result["status"] == "OK":
|
||||
break
|
||||
except Exception as e:
|
||||
if attempt == MAX_RETRIES - 1:
|
||||
# Último intento fallido, pero blob ya está verificado
|
||||
deriv_result = {"status": "PARTIAL", "error": str(e)}
|
||||
else:
|
||||
await asyncio.sleep(RETRY_BACKOFF_BASE ** attempt)
|
||||
|
||||
# 3. Crear user_asset
|
||||
public_key = generate_public_key(declared_hash, user_id)
|
||||
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
INSERT INTO storage.user_assets
|
||||
(public_key, blob_hash, user_id, original_filename, access_password)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
""", public_key, declared_hash, user_id, original_filename, access_password)
|
||||
|
||||
return {
|
||||
"status": "CREATED",
|
||||
"public_key": public_key,
|
||||
"content_hash": declared_hash,
|
||||
"derivatives": deriv_result
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# REGISTRO DE BLOB (sin subida - para archivos existentes)
|
||||
# =========================================================================
|
||||
|
||||
async def register_blob(
|
||||
self,
|
||||
content_hash: str,
|
||||
file_size: int,
|
||||
mime_type: str,
|
||||
storage_provider: str,
|
||||
storage_path: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Registra un blob existente en el sistema"""
|
||||
async with self.pool.acquire() as conn:
|
||||
# Verificar si ya existe
|
||||
existing = await conn.fetchrow("""
|
||||
SELECT content_hash, verification_status
|
||||
FROM storage.physical_blobs
|
||||
WHERE content_hash = $1
|
||||
""", content_hash)
|
||||
|
||||
if existing:
|
||||
return {
|
||||
"status": "EXISTS",
|
||||
"content_hash": content_hash,
|
||||
"verification_status": existing["verification_status"]
|
||||
}
|
||||
|
||||
# Insertar nuevo blob
|
||||
await conn.execute("""
|
||||
INSERT INTO storage.physical_blobs
|
||||
(content_hash, file_size, mime_type, storage_provider, storage_path)
|
||||
VALUES ($1, $2, $3, $4::storage.storage_provider_enum, $5)
|
||||
""", content_hash, file_size, mime_type, storage_provider, storage_path)
|
||||
|
||||
return {
|
||||
"status": "REGISTERED",
|
||||
"content_hash": content_hash,
|
||||
"verification_status": "PENDING"
|
||||
}
|
||||
|
||||
# =========================================================================
|
||||
# MANTENIMIENTO
|
||||
# =========================================================================
|
||||
|
||||
async def garbage_collect(self, dry_run: bool = True) -> Dict[str, Any]:
|
||||
"""
|
||||
Elimina blobs huérfanos (ref_count = 0, sin actualizar en 30 días)
|
||||
"""
|
||||
async with self.pool.acquire() as conn:
|
||||
orphans = await conn.fetch("""
|
||||
SELECT content_hash, storage_path
|
||||
FROM storage.physical_blobs
|
||||
WHERE ref_count = 0
|
||||
AND updated_at < NOW() - INTERVAL '30 days'
|
||||
""")
|
||||
|
||||
deleted = []
|
||||
for blob in orphans:
|
||||
if not dry_run:
|
||||
# Eliminar derivados
|
||||
for ext in [".thumb", ".json"]:
|
||||
try:
|
||||
self.s3.delete_object(Bucket=R2_BUCKET, Key=f"{blob['content_hash']}{ext}")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Eliminar blob
|
||||
try:
|
||||
self.s3.delete_object(Bucket=R2_BUCKET, Key=blob["storage_path"])
|
||||
except:
|
||||
pass
|
||||
|
||||
# Eliminar registro
|
||||
async with self.pool.acquire() as conn:
|
||||
await conn.execute("""
|
||||
DELETE FROM storage.physical_blobs WHERE content_hash = $1
|
||||
""", blob["content_hash"])
|
||||
|
||||
deleted.append(blob["content_hash"])
|
||||
|
||||
return {
|
||||
"status": "OK",
|
||||
"dry_run": dry_run,
|
||||
"orphans_found": len(orphans),
|
||||
"deleted": deleted if not dry_run else []
|
||||
}
|
||||
|
||||
async def integrity_check(self, sample_percent: float = 0.01) -> Dict[str, Any]:
|
||||
"""
|
||||
Verifica integridad de una muestra aleatoria de blobs
|
||||
"""
|
||||
async with self.pool.acquire() as conn:
|
||||
blobs = await conn.fetch("""
|
||||
SELECT content_hash, storage_path
|
||||
FROM storage.physical_blobs
|
||||
WHERE verification_status = 'VERIFIED'
|
||||
ORDER BY RANDOM()
|
||||
LIMIT (SELECT CEIL(COUNT(*) * $1) FROM storage.physical_blobs WHERE verification_status = 'VERIFIED')
|
||||
""", sample_percent)
|
||||
|
||||
results = {"checked": 0, "ok": 0, "corrupt": []}
|
||||
|
||||
for blob in blobs:
|
||||
results["checked"] += 1
|
||||
verify = await self.verify_blob(blob["content_hash"], blob["storage_path"])
|
||||
|
||||
if verify["status"] == "VERIFIED":
|
||||
results["ok"] += 1
|
||||
else:
|
||||
results["corrupt"].append(blob["content_hash"])
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# CLI para pruebas
|
||||
async def main():
|
||||
import sys
|
||||
|
||||
worker = StorageWorker()
|
||||
await worker.init()
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: storage_worker.py <command> [args]")
|
||||
print("Commands: gc, integrity, register")
|
||||
return
|
||||
|
||||
cmd = sys.argv[1]
|
||||
|
||||
if cmd == "gc":
|
||||
dry_run = "--execute" not in sys.argv
|
||||
result = await worker.garbage_collect(dry_run=dry_run)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "integrity":
|
||||
result = await worker.integrity_check()
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
elif cmd == "register":
|
||||
if len(sys.argv) < 6:
|
||||
print("Usage: storage_worker.py register <hash> <size> <mime> <path>")
|
||||
return
|
||||
result = await worker.register_blob(
|
||||
sys.argv[2], int(sys.argv[3]), sys.argv[4], "R2_PRIMARY", sys.argv[5]
|
||||
)
|
||||
print(json.dumps(result, indent=2))
|
||||
|
||||
await worker.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user