- apps/captain-mobile: Mobile API service - apps/flow-ui: Flow UI application - apps/mindlink: Mindlink application - apps/storage: Storage API and workers - apps/tzzr-cli: TZZR CLI tool - deck-frontend/backups: Historical TypeScript versions - hst-frontend: Standalone HST frontend Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
481 lines
16 KiB
Python
481 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Storage Worker - Verificación y procesamiento de archivos
|
|
Spec: Sistema de Almacenamiento Híbrido v4.0
|
|
"""
|
|
|
|
import os
|
|
import hashlib
|
|
import json
|
|
import asyncio
|
|
import asyncpg
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any
|
|
import boto3
|
|
from PIL import Image
|
|
import fitz # PyMuPDF
|
|
import io
|
|
import tempfile
|
|
|
|
# Configuración
|
|
R2_ENDPOINT = os.environ.get("R2_ENDPOINT", "https://7dedae6030f5554d99d37e98a5232996.r2.cloudflarestorage.com")
|
|
R2_BUCKET = os.environ.get("R2_BUCKET", "deck")
|
|
DB_URL = os.environ.get("DATABASE_URL", "postgresql://tzzr:tzzr@localhost:5432/tzzr")
|
|
|
|
THUMB_WIDTH = 300
|
|
MAX_RETRIES = 9
|
|
RETRY_BACKOFF_BASE = 2
|
|
|
|
|
|
def get_s3_client():
|
|
return boto3.client(
|
|
"s3",
|
|
endpoint_url=R2_ENDPOINT,
|
|
aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"),
|
|
aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"),
|
|
)
|
|
|
|
|
|
async def get_db_pool():
|
|
return await asyncpg.create_pool(DB_URL, min_size=2, max_size=10)
|
|
|
|
|
|
def calculate_sha256(data: bytes) -> str:
|
|
"""Calcula SHA-256 de bytes"""
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
|
|
def generate_public_key(content_hash: str, user_id: str) -> str:
|
|
"""Genera public_key única para un asset"""
|
|
data = f"{content_hash}{user_id}{datetime.now().isoformat()}"
|
|
return hashlib.sha256(data.encode()).hexdigest()
|
|
|
|
|
|
class StorageWorker:
|
|
def __init__(self):
|
|
self.s3 = get_s3_client()
|
|
self.pool = None
|
|
|
|
async def init(self):
|
|
self.pool = await get_db_pool()
|
|
|
|
async def close(self):
|
|
if self.pool:
|
|
await self.pool.close()
|
|
|
|
# =========================================================================
|
|
# VERIFICACIÓN DE HASH
|
|
# =========================================================================
|
|
|
|
async def verify_blob(self, declared_hash: str, storage_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Verifica que el hash declarado coincida con el contenido real.
|
|
NUNCA confiamos en el hash del cliente.
|
|
"""
|
|
try:
|
|
# Descargar archivo
|
|
obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path)
|
|
content = obj["Body"].read()
|
|
|
|
# Calcular hash real
|
|
calculated_hash = calculate_sha256(content)
|
|
|
|
if calculated_hash != declared_hash:
|
|
# HASH MISMATCH - Archivo corrupto o spoofing
|
|
await self._mark_corrupt(declared_hash, storage_path)
|
|
return {
|
|
"status": "CORRUPT",
|
|
"declared": declared_hash,
|
|
"calculated": calculated_hash,
|
|
"action": "deleted"
|
|
}
|
|
|
|
# Hash coincide - Marcar como verificado
|
|
await self._mark_verified(declared_hash)
|
|
|
|
return {
|
|
"status": "VERIFIED",
|
|
"hash": declared_hash,
|
|
"size": len(content)
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"status": "ERROR", "error": str(e)}
|
|
|
|
async def _mark_corrupt(self, content_hash: str, storage_path: str):
|
|
"""Marca blob como corrupto y elimina archivo"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
UPDATE storage.physical_blobs
|
|
SET verification_status = 'CORRUPT', updated_at = NOW()
|
|
WHERE content_hash = $1
|
|
""", content_hash)
|
|
|
|
# Eliminar archivo del bucket
|
|
try:
|
|
self.s3.delete_object(Bucket=R2_BUCKET, Key=storage_path)
|
|
except:
|
|
pass
|
|
|
|
async def _mark_verified(self, content_hash: str):
|
|
"""Marca blob como verificado"""
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
UPDATE storage.physical_blobs
|
|
SET verification_status = 'VERIFIED',
|
|
last_verified_at = NOW(),
|
|
updated_at = NOW()
|
|
WHERE content_hash = $1
|
|
""", content_hash)
|
|
|
|
# =========================================================================
|
|
# GENERACIÓN DE DERIVADOS
|
|
# =========================================================================
|
|
|
|
async def generate_derivatives(self, content_hash: str) -> Dict[str, Any]:
|
|
"""Genera thumbnail y metadatos para un blob verificado"""
|
|
async with self.pool.acquire() as conn:
|
|
blob = await conn.fetchrow("""
|
|
SELECT content_hash, mime_type, storage_path, file_size
|
|
FROM storage.physical_blobs
|
|
WHERE content_hash = $1 AND verification_status = 'VERIFIED'
|
|
""", content_hash)
|
|
|
|
if not blob:
|
|
return {"status": "ERROR", "error": "Blob not found or not verified"}
|
|
|
|
mime_type = blob["mime_type"]
|
|
storage_path = blob["storage_path"]
|
|
|
|
# Descargar archivo
|
|
obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path)
|
|
content = obj["Body"].read()
|
|
|
|
metadata = {
|
|
"content_hash": content_hash,
|
|
"mime_type": mime_type,
|
|
"file_size": blob["file_size"],
|
|
"processed_at": datetime.now().isoformat()
|
|
}
|
|
|
|
thumb_generated = False
|
|
|
|
# Generar thumbnail según tipo
|
|
if mime_type.startswith("image/"):
|
|
thumb_data, extra_meta = self._process_image(content)
|
|
metadata.update(extra_meta)
|
|
if thumb_data:
|
|
await self._save_thumb(content_hash, thumb_data)
|
|
thumb_generated = True
|
|
|
|
elif mime_type == "application/pdf":
|
|
thumb_data, extra_meta = self._process_pdf(content)
|
|
metadata.update(extra_meta)
|
|
if thumb_data:
|
|
await self._save_thumb(content_hash, thumb_data)
|
|
thumb_generated = True
|
|
|
|
# Guardar metadatos
|
|
await self._save_metadata(content_hash, metadata)
|
|
|
|
return {
|
|
"status": "OK",
|
|
"thumb_generated": thumb_generated,
|
|
"metadata": metadata
|
|
}
|
|
|
|
def _process_image(self, content: bytes) -> tuple:
|
|
"""Procesa imagen: genera thumb y extrae metadatos"""
|
|
try:
|
|
img = Image.open(io.BytesIO(content))
|
|
|
|
# Metadatos
|
|
meta = {
|
|
"width": img.width,
|
|
"height": img.height,
|
|
"format": img.format,
|
|
"mode": img.mode
|
|
}
|
|
|
|
# EXIF si disponible
|
|
if hasattr(img, '_getexif') and img._getexif():
|
|
meta["has_exif"] = True
|
|
|
|
# Generar thumbnail
|
|
ratio = THUMB_WIDTH / img.width
|
|
new_height = int(img.height * ratio)
|
|
thumb = img.copy()
|
|
thumb.thumbnail((THUMB_WIDTH, new_height), Image.Resampling.LANCZOS)
|
|
|
|
# Convertir a bytes
|
|
thumb_buffer = io.BytesIO()
|
|
thumb.save(thumb_buffer, format="JPEG", quality=85)
|
|
thumb_data = thumb_buffer.getvalue()
|
|
|
|
return thumb_data, meta
|
|
|
|
except Exception as e:
|
|
return None, {"error": str(e)}
|
|
|
|
def _process_pdf(self, content: bytes) -> tuple:
|
|
"""Procesa PDF: genera thumb de primera página y extrae metadatos"""
|
|
try:
|
|
doc = fitz.open(stream=content, filetype="pdf")
|
|
|
|
meta = {
|
|
"pages": len(doc),
|
|
"format": "PDF"
|
|
}
|
|
|
|
# Metadatos del documento
|
|
pdf_meta = doc.metadata
|
|
if pdf_meta:
|
|
if pdf_meta.get("author"):
|
|
meta["author"] = pdf_meta["author"]
|
|
if pdf_meta.get("title"):
|
|
meta["title"] = pdf_meta["title"]
|
|
|
|
# Render primera página como thumbnail
|
|
if len(doc) > 0:
|
|
page = doc[0]
|
|
# Escalar para que el ancho sea THUMB_WIDTH
|
|
zoom = THUMB_WIDTH / page.rect.width
|
|
mat = fitz.Matrix(zoom, zoom)
|
|
pix = page.get_pixmap(matrix=mat)
|
|
thumb_data = pix.tobytes("jpeg")
|
|
else:
|
|
thumb_data = None
|
|
|
|
doc.close()
|
|
return thumb_data, meta
|
|
|
|
except Exception as e:
|
|
return None, {"error": str(e)}
|
|
|
|
async def _save_thumb(self, content_hash: str, thumb_data: bytes):
|
|
"""Guarda thumbnail en el bucket"""
|
|
key = f"{content_hash}.thumb"
|
|
self.s3.put_object(
|
|
Bucket=R2_BUCKET,
|
|
Key=key,
|
|
Body=thumb_data,
|
|
ContentType="image/jpeg"
|
|
)
|
|
|
|
async def _save_metadata(self, content_hash: str, metadata: dict):
|
|
"""Guarda metadatos JSON en el bucket"""
|
|
key = f"{content_hash}.json"
|
|
self.s3.put_object(
|
|
Bucket=R2_BUCKET,
|
|
Key=key,
|
|
Body=json.dumps(metadata, indent=2),
|
|
ContentType="application/json"
|
|
)
|
|
|
|
# =========================================================================
|
|
# PROCESAMIENTO COMPLETO
|
|
# =========================================================================
|
|
|
|
async def process_upload(
|
|
self,
|
|
declared_hash: str,
|
|
storage_path: str,
|
|
user_id: str,
|
|
original_filename: str,
|
|
access_password: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Proceso completo post-upload:
|
|
1. Verificar hash
|
|
2. Generar derivados
|
|
3. Crear user_asset
|
|
"""
|
|
# 1. Verificar hash
|
|
verify_result = await self.verify_blob(declared_hash, storage_path)
|
|
|
|
if verify_result["status"] != "VERIFIED":
|
|
return verify_result
|
|
|
|
# 2. Generar derivados (con reintentos)
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
deriv_result = await self.generate_derivatives(declared_hash)
|
|
if deriv_result["status"] == "OK":
|
|
break
|
|
except Exception as e:
|
|
if attempt == MAX_RETRIES - 1:
|
|
# Último intento fallido, pero blob ya está verificado
|
|
deriv_result = {"status": "PARTIAL", "error": str(e)}
|
|
else:
|
|
await asyncio.sleep(RETRY_BACKOFF_BASE ** attempt)
|
|
|
|
# 3. Crear user_asset
|
|
public_key = generate_public_key(declared_hash, user_id)
|
|
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
INSERT INTO storage.user_assets
|
|
(public_key, blob_hash, user_id, original_filename, access_password)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
""", public_key, declared_hash, user_id, original_filename, access_password)
|
|
|
|
return {
|
|
"status": "CREATED",
|
|
"public_key": public_key,
|
|
"content_hash": declared_hash,
|
|
"derivatives": deriv_result
|
|
}
|
|
|
|
# =========================================================================
|
|
# REGISTRO DE BLOB (sin subida - para archivos existentes)
|
|
# =========================================================================
|
|
|
|
async def register_blob(
|
|
self,
|
|
content_hash: str,
|
|
file_size: int,
|
|
mime_type: str,
|
|
storage_provider: str,
|
|
storage_path: str
|
|
) -> Dict[str, Any]:
|
|
"""Registra un blob existente en el sistema"""
|
|
async with self.pool.acquire() as conn:
|
|
# Verificar si ya existe
|
|
existing = await conn.fetchrow("""
|
|
SELECT content_hash, verification_status
|
|
FROM storage.physical_blobs
|
|
WHERE content_hash = $1
|
|
""", content_hash)
|
|
|
|
if existing:
|
|
return {
|
|
"status": "EXISTS",
|
|
"content_hash": content_hash,
|
|
"verification_status": existing["verification_status"]
|
|
}
|
|
|
|
# Insertar nuevo blob
|
|
await conn.execute("""
|
|
INSERT INTO storage.physical_blobs
|
|
(content_hash, file_size, mime_type, storage_provider, storage_path)
|
|
VALUES ($1, $2, $3, $4::storage.storage_provider_enum, $5)
|
|
""", content_hash, file_size, mime_type, storage_provider, storage_path)
|
|
|
|
return {
|
|
"status": "REGISTERED",
|
|
"content_hash": content_hash,
|
|
"verification_status": "PENDING"
|
|
}
|
|
|
|
# =========================================================================
|
|
# MANTENIMIENTO
|
|
# =========================================================================
|
|
|
|
async def garbage_collect(self, dry_run: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Elimina blobs huérfanos (ref_count = 0, sin actualizar en 30 días)
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
orphans = await conn.fetch("""
|
|
SELECT content_hash, storage_path
|
|
FROM storage.physical_blobs
|
|
WHERE ref_count = 0
|
|
AND updated_at < NOW() - INTERVAL '30 days'
|
|
""")
|
|
|
|
deleted = []
|
|
for blob in orphans:
|
|
if not dry_run:
|
|
# Eliminar derivados
|
|
for ext in [".thumb", ".json"]:
|
|
try:
|
|
self.s3.delete_object(Bucket=R2_BUCKET, Key=f"{blob['content_hash']}{ext}")
|
|
except:
|
|
pass
|
|
|
|
# Eliminar blob
|
|
try:
|
|
self.s3.delete_object(Bucket=R2_BUCKET, Key=blob["storage_path"])
|
|
except:
|
|
pass
|
|
|
|
# Eliminar registro
|
|
async with self.pool.acquire() as conn:
|
|
await conn.execute("""
|
|
DELETE FROM storage.physical_blobs WHERE content_hash = $1
|
|
""", blob["content_hash"])
|
|
|
|
deleted.append(blob["content_hash"])
|
|
|
|
return {
|
|
"status": "OK",
|
|
"dry_run": dry_run,
|
|
"orphans_found": len(orphans),
|
|
"deleted": deleted if not dry_run else []
|
|
}
|
|
|
|
async def integrity_check(self, sample_percent: float = 0.01) -> Dict[str, Any]:
|
|
"""
|
|
Verifica integridad de una muestra aleatoria de blobs
|
|
"""
|
|
async with self.pool.acquire() as conn:
|
|
blobs = await conn.fetch("""
|
|
SELECT content_hash, storage_path
|
|
FROM storage.physical_blobs
|
|
WHERE verification_status = 'VERIFIED'
|
|
ORDER BY RANDOM()
|
|
LIMIT (SELECT CEIL(COUNT(*) * $1) FROM storage.physical_blobs WHERE verification_status = 'VERIFIED')
|
|
""", sample_percent)
|
|
|
|
results = {"checked": 0, "ok": 0, "corrupt": []}
|
|
|
|
for blob in blobs:
|
|
results["checked"] += 1
|
|
verify = await self.verify_blob(blob["content_hash"], blob["storage_path"])
|
|
|
|
if verify["status"] == "VERIFIED":
|
|
results["ok"] += 1
|
|
else:
|
|
results["corrupt"].append(blob["content_hash"])
|
|
|
|
return results
|
|
|
|
|
|
# CLI para pruebas
|
|
async def main():
|
|
import sys
|
|
|
|
worker = StorageWorker()
|
|
await worker.init()
|
|
|
|
if len(sys.argv) < 2:
|
|
print("Usage: storage_worker.py <command> [args]")
|
|
print("Commands: gc, integrity, register")
|
|
return
|
|
|
|
cmd = sys.argv[1]
|
|
|
|
if cmd == "gc":
|
|
dry_run = "--execute" not in sys.argv
|
|
result = await worker.garbage_collect(dry_run=dry_run)
|
|
print(json.dumps(result, indent=2))
|
|
|
|
elif cmd == "integrity":
|
|
result = await worker.integrity_check()
|
|
print(json.dumps(result, indent=2))
|
|
|
|
elif cmd == "register":
|
|
if len(sys.argv) < 6:
|
|
print("Usage: storage_worker.py register <hash> <size> <mime> <path>")
|
|
return
|
|
result = await worker.register_blob(
|
|
sys.argv[2], int(sys.argv[3]), sys.argv[4], "R2_PRIMARY", sys.argv[5]
|
|
)
|
|
print(json.dumps(result, indent=2))
|
|
|
|
await worker.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|