#!/usr/bin/env python3 """ Storage Worker - Verificación y procesamiento de archivos Spec: Sistema de Almacenamiento Híbrido v4.0 """ import os import hashlib import json import asyncio import asyncpg from datetime import datetime from typing import Optional, Dict, Any import boto3 from PIL import Image import fitz # PyMuPDF import io import tempfile # Configuración R2_ENDPOINT = os.environ.get("R2_ENDPOINT", "https://7dedae6030f5554d99d37e98a5232996.r2.cloudflarestorage.com") R2_BUCKET = os.environ.get("R2_BUCKET", "deck") DB_URL = os.environ.get("DATABASE_URL", "postgresql://tzzr:tzzr@localhost:5432/tzzr") THUMB_WIDTH = 300 MAX_RETRIES = 9 RETRY_BACKOFF_BASE = 2 def get_s3_client(): return boto3.client( "s3", endpoint_url=R2_ENDPOINT, aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"), ) async def get_db_pool(): return await asyncpg.create_pool(DB_URL, min_size=2, max_size=10) def calculate_sha256(data: bytes) -> str: """Calcula SHA-256 de bytes""" return hashlib.sha256(data).hexdigest() def generate_public_key(content_hash: str, user_id: str) -> str: """Genera public_key única para un asset""" data = f"{content_hash}{user_id}{datetime.now().isoformat()}" return hashlib.sha256(data.encode()).hexdigest() class StorageWorker: def __init__(self): self.s3 = get_s3_client() self.pool = None async def init(self): self.pool = await get_db_pool() async def close(self): if self.pool: await self.pool.close() # ========================================================================= # VERIFICACIÓN DE HASH # ========================================================================= async def verify_blob(self, declared_hash: str, storage_path: str) -> Dict[str, Any]: """ Verifica que el hash declarado coincida con el contenido real. NUNCA confiamos en el hash del cliente. """ try: # Descargar archivo obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path) content = obj["Body"].read() # Calcular hash real calculated_hash = calculate_sha256(content) if calculated_hash != declared_hash: # HASH MISMATCH - Archivo corrupto o spoofing await self._mark_corrupt(declared_hash, storage_path) return { "status": "CORRUPT", "declared": declared_hash, "calculated": calculated_hash, "action": "deleted" } # Hash coincide - Marcar como verificado await self._mark_verified(declared_hash) return { "status": "VERIFIED", "hash": declared_hash, "size": len(content) } except Exception as e: return {"status": "ERROR", "error": str(e)} async def _mark_corrupt(self, content_hash: str, storage_path: str): """Marca blob como corrupto y elimina archivo""" async with self.pool.acquire() as conn: await conn.execute(""" UPDATE storage.physical_blobs SET verification_status = 'CORRUPT', updated_at = NOW() WHERE content_hash = $1 """, content_hash) # Eliminar archivo del bucket try: self.s3.delete_object(Bucket=R2_BUCKET, Key=storage_path) except: pass async def _mark_verified(self, content_hash: str): """Marca blob como verificado""" async with self.pool.acquire() as conn: await conn.execute(""" UPDATE storage.physical_blobs SET verification_status = 'VERIFIED', last_verified_at = NOW(), updated_at = NOW() WHERE content_hash = $1 """, content_hash) # ========================================================================= # GENERACIÓN DE DERIVADOS # ========================================================================= async def generate_derivatives(self, content_hash: str) -> Dict[str, Any]: """Genera thumbnail y metadatos para un blob verificado""" async with self.pool.acquire() as conn: blob = await conn.fetchrow(""" SELECT content_hash, mime_type, storage_path, file_size FROM storage.physical_blobs WHERE content_hash = $1 AND verification_status = 'VERIFIED' """, content_hash) if not blob: return {"status": "ERROR", "error": "Blob not found or not verified"} mime_type = blob["mime_type"] storage_path = blob["storage_path"] # Descargar archivo obj = self.s3.get_object(Bucket=R2_BUCKET, Key=storage_path) content = obj["Body"].read() metadata = { "content_hash": content_hash, "mime_type": mime_type, "file_size": blob["file_size"], "processed_at": datetime.now().isoformat() } thumb_generated = False # Generar thumbnail según tipo if mime_type.startswith("image/"): thumb_data, extra_meta = self._process_image(content) metadata.update(extra_meta) if thumb_data: await self._save_thumb(content_hash, thumb_data) thumb_generated = True elif mime_type == "application/pdf": thumb_data, extra_meta = self._process_pdf(content) metadata.update(extra_meta) if thumb_data: await self._save_thumb(content_hash, thumb_data) thumb_generated = True # Guardar metadatos await self._save_metadata(content_hash, metadata) return { "status": "OK", "thumb_generated": thumb_generated, "metadata": metadata } def _process_image(self, content: bytes) -> tuple: """Procesa imagen: genera thumb y extrae metadatos""" try: img = Image.open(io.BytesIO(content)) # Metadatos meta = { "width": img.width, "height": img.height, "format": img.format, "mode": img.mode } # EXIF si disponible if hasattr(img, '_getexif') and img._getexif(): meta["has_exif"] = True # Generar thumbnail ratio = THUMB_WIDTH / img.width new_height = int(img.height * ratio) thumb = img.copy() thumb.thumbnail((THUMB_WIDTH, new_height), Image.Resampling.LANCZOS) # Convertir a bytes thumb_buffer = io.BytesIO() thumb.save(thumb_buffer, format="JPEG", quality=85) thumb_data = thumb_buffer.getvalue() return thumb_data, meta except Exception as e: return None, {"error": str(e)} def _process_pdf(self, content: bytes) -> tuple: """Procesa PDF: genera thumb de primera página y extrae metadatos""" try: doc = fitz.open(stream=content, filetype="pdf") meta = { "pages": len(doc), "format": "PDF" } # Metadatos del documento pdf_meta = doc.metadata if pdf_meta: if pdf_meta.get("author"): meta["author"] = pdf_meta["author"] if pdf_meta.get("title"): meta["title"] = pdf_meta["title"] # Render primera página como thumbnail if len(doc) > 0: page = doc[0] # Escalar para que el ancho sea THUMB_WIDTH zoom = THUMB_WIDTH / page.rect.width mat = fitz.Matrix(zoom, zoom) pix = page.get_pixmap(matrix=mat) thumb_data = pix.tobytes("jpeg") else: thumb_data = None doc.close() return thumb_data, meta except Exception as e: return None, {"error": str(e)} async def _save_thumb(self, content_hash: str, thumb_data: bytes): """Guarda thumbnail en el bucket""" key = f"{content_hash}.thumb" self.s3.put_object( Bucket=R2_BUCKET, Key=key, Body=thumb_data, ContentType="image/jpeg" ) async def _save_metadata(self, content_hash: str, metadata: dict): """Guarda metadatos JSON en el bucket""" key = f"{content_hash}.json" self.s3.put_object( Bucket=R2_BUCKET, Key=key, Body=json.dumps(metadata, indent=2), ContentType="application/json" ) # ========================================================================= # PROCESAMIENTO COMPLETO # ========================================================================= async def process_upload( self, declared_hash: str, storage_path: str, user_id: str, original_filename: str, access_password: Optional[str] = None ) -> Dict[str, Any]: """ Proceso completo post-upload: 1. Verificar hash 2. Generar derivados 3. Crear user_asset """ # 1. Verificar hash verify_result = await self.verify_blob(declared_hash, storage_path) if verify_result["status"] != "VERIFIED": return verify_result # 2. Generar derivados (con reintentos) for attempt in range(MAX_RETRIES): try: deriv_result = await self.generate_derivatives(declared_hash) if deriv_result["status"] == "OK": break except Exception as e: if attempt == MAX_RETRIES - 1: # Último intento fallido, pero blob ya está verificado deriv_result = {"status": "PARTIAL", "error": str(e)} else: await asyncio.sleep(RETRY_BACKOFF_BASE ** attempt) # 3. Crear user_asset public_key = generate_public_key(declared_hash, user_id) async with self.pool.acquire() as conn: await conn.execute(""" INSERT INTO storage.user_assets (public_key, blob_hash, user_id, original_filename, access_password) VALUES ($1, $2, $3, $4, $5) """, public_key, declared_hash, user_id, original_filename, access_password) return { "status": "CREATED", "public_key": public_key, "content_hash": declared_hash, "derivatives": deriv_result } # ========================================================================= # REGISTRO DE BLOB (sin subida - para archivos existentes) # ========================================================================= async def register_blob( self, content_hash: str, file_size: int, mime_type: str, storage_provider: str, storage_path: str ) -> Dict[str, Any]: """Registra un blob existente en el sistema""" async with self.pool.acquire() as conn: # Verificar si ya existe existing = await conn.fetchrow(""" SELECT content_hash, verification_status FROM storage.physical_blobs WHERE content_hash = $1 """, content_hash) if existing: return { "status": "EXISTS", "content_hash": content_hash, "verification_status": existing["verification_status"] } # Insertar nuevo blob await conn.execute(""" INSERT INTO storage.physical_blobs (content_hash, file_size, mime_type, storage_provider, storage_path) VALUES ($1, $2, $3, $4::storage.storage_provider_enum, $5) """, content_hash, file_size, mime_type, storage_provider, storage_path) return { "status": "REGISTERED", "content_hash": content_hash, "verification_status": "PENDING" } # ========================================================================= # MANTENIMIENTO # ========================================================================= async def garbage_collect(self, dry_run: bool = True) -> Dict[str, Any]: """ Elimina blobs huérfanos (ref_count = 0, sin actualizar en 30 días) """ async with self.pool.acquire() as conn: orphans = await conn.fetch(""" SELECT content_hash, storage_path FROM storage.physical_blobs WHERE ref_count = 0 AND updated_at < NOW() - INTERVAL '30 days' """) deleted = [] for blob in orphans: if not dry_run: # Eliminar derivados for ext in [".thumb", ".json"]: try: self.s3.delete_object(Bucket=R2_BUCKET, Key=f"{blob['content_hash']}{ext}") except: pass # Eliminar blob try: self.s3.delete_object(Bucket=R2_BUCKET, Key=blob["storage_path"]) except: pass # Eliminar registro async with self.pool.acquire() as conn: await conn.execute(""" DELETE FROM storage.physical_blobs WHERE content_hash = $1 """, blob["content_hash"]) deleted.append(blob["content_hash"]) return { "status": "OK", "dry_run": dry_run, "orphans_found": len(orphans), "deleted": deleted if not dry_run else [] } async def integrity_check(self, sample_percent: float = 0.01) -> Dict[str, Any]: """ Verifica integridad de una muestra aleatoria de blobs """ async with self.pool.acquire() as conn: blobs = await conn.fetch(""" SELECT content_hash, storage_path FROM storage.physical_blobs WHERE verification_status = 'VERIFIED' ORDER BY RANDOM() LIMIT (SELECT CEIL(COUNT(*) * $1) FROM storage.physical_blobs WHERE verification_status = 'VERIFIED') """, sample_percent) results = {"checked": 0, "ok": 0, "corrupt": []} for blob in blobs: results["checked"] += 1 verify = await self.verify_blob(blob["content_hash"], blob["storage_path"]) if verify["status"] == "VERIFIED": results["ok"] += 1 else: results["corrupt"].append(blob["content_hash"]) return results # CLI para pruebas async def main(): import sys worker = StorageWorker() await worker.init() if len(sys.argv) < 2: print("Usage: storage_worker.py [args]") print("Commands: gc, integrity, register") return cmd = sys.argv[1] if cmd == "gc": dry_run = "--execute" not in sys.argv result = await worker.garbage_collect(dry_run=dry_run) print(json.dumps(result, indent=2)) elif cmd == "integrity": result = await worker.integrity_check() print(json.dumps(result, indent=2)) elif cmd == "register": if len(sys.argv) < 6: print("Usage: storage_worker.py register ") return result = await worker.register_blob( sys.argv[2], int(sys.argv[3]), sys.argv[4], "R2_PRIMARY", sys.argv[5] ) print(json.dumps(result, indent=2)) await worker.close() if __name__ == "__main__": asyncio.run(main())