#!/usr/bin/env python3 """ Storage API - Endpoints para upload/download de archivos Spec: Sistema de Almacenamiento Híbrido v4.0 """ import os import hashlib import json import asyncio from datetime import datetime, timedelta from typing import Optional import asyncpg import boto3 from fastapi import FastAPI, HTTPException, Request, Header, Query, BackgroundTasks from fastapi.responses import RedirectResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import uvicorn from collections import defaultdict import time import argon2 # Configuración R2_ENDPOINT = os.environ.get("R2_ENDPOINT", "https://7dedae6030f5554d99d37e98a5232996.r2.cloudflarestorage.com") R2_BUCKET = os.environ.get("R2_BUCKET", "deck") DB_URL = os.environ.get("DATABASE_URL", "postgresql://tzzr:tzzr@localhost:5432/tzzr") PRESIGNED_UPLOAD_EXPIRY = 3 * 60 * 60 # 3 horas PRESIGNED_DOWNLOAD_EXPIRY = 45 * 60 # 45 minutos # Rate limiting RATE_LIMIT_IP = 100 # req/min por IP RATE_LIMIT_KEY = 50 # descargas/hora por public_key RATE_LIMIT_TRANSFER = 10 * 1024 * 1024 * 1024 # 10 GB/hora app = FastAPI(title="Storage API", version="4.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # Estado global db_pool = None s3_client = None rate_limits = { "ip": defaultdict(list), # IP -> [timestamps] "key": defaultdict(list), # public_key -> [timestamps] "transfer": defaultdict(int) # IP -> bytes } ph = argon2.PasswordHasher() # ========================================================================= # STARTUP / SHUTDOWN # ========================================================================= @app.on_event("startup") async def startup(): global db_pool, s3_client db_pool = await asyncpg.create_pool(DB_URL, min_size=2, max_size=20) s3_client = boto3.client( "s3", endpoint_url=R2_ENDPOINT, aws_access_key_id=os.environ.get("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.environ.get("AWS_SECRET_ACCESS_KEY"), ) @app.on_event("shutdown") async def shutdown(): if db_pool: await db_pool.close() # ========================================================================= # RATE LIMITING # ========================================================================= def check_rate_limit_ip(ip: str) -> bool: """100 req/min por IP""" now = time.time() minute_ago = now - 60 # Limpiar timestamps viejos rate_limits["ip"][ip] = [t for t in rate_limits["ip"][ip] if t > minute_ago] if len(rate_limits["ip"][ip]) >= RATE_LIMIT_IP: return False rate_limits["ip"][ip].append(now) return True def check_rate_limit_key(public_key: str) -> bool: """50 descargas/hora por public_key""" now = time.time() hour_ago = now - 3600 rate_limits["key"][public_key] = [t for t in rate_limits["key"][public_key] if t > hour_ago] if len(rate_limits["key"][public_key]) >= RATE_LIMIT_KEY: return False rate_limits["key"][public_key].append(now) return True # ========================================================================= # MODELS # ========================================================================= class UploadInitRequest(BaseModel): hash: str size: int mime_type: str filename: str user_id: str password: Optional[str] = None class UploadInitResponse(BaseModel): status: str presigned_url: Optional[str] = None deduplicated: bool = False public_key: Optional[str] = None # ========================================================================= # UPLOAD ENDPOINTS # ========================================================================= @app.post("/upload/init", response_model=UploadInitResponse) async def upload_init(req: UploadInitRequest, request: Request, background_tasks: BackgroundTasks): """ Iniciar upload. Devuelve presigned URL o confirma deduplicación. """ client_ip = request.client.host if not check_rate_limit_ip(client_ip): raise HTTPException(429, "Rate limit exceeded") async with db_pool.acquire() as conn: # Verificar si blob ya existe blob = await conn.fetchrow(""" SELECT content_hash, verification_status FROM storage.physical_blobs WHERE content_hash = $1 """, req.hash) if blob: if blob["verification_status"] == "VERIFIED": # Deduplicación: crear asset sin subir public_key = hashlib.sha256( f"{req.hash}{req.user_id}{datetime.now().isoformat()}".encode() ).hexdigest() password_hash = None if req.password: password_hash = ph.hash(req.password) await conn.execute(""" INSERT INTO storage.user_assets (public_key, blob_hash, user_id, original_filename, access_password) VALUES ($1, $2, $3, $4, $5) """, public_key, req.hash, req.user_id, req.filename, password_hash) return UploadInitResponse( status="created", deduplicated=True, public_key=public_key ) # Blob existe pero PENDING - cliente debe subir de todas formas else: # Crear registro PENDING storage_path = f"{req.hash}.bin" await conn.execute(""" INSERT INTO storage.physical_blobs (content_hash, file_size, mime_type, storage_provider, storage_path) VALUES ($1, $2, $3, 'R2_PRIMARY', $4) """, req.hash, req.size, req.mime_type, storage_path) # Generar presigned URL para upload storage_path = f"{req.hash}.bin" presigned_url = s3_client.generate_presigned_url( "put_object", Params={ "Bucket": R2_BUCKET, "Key": storage_path, "ContentType": req.mime_type }, ExpiresIn=PRESIGNED_UPLOAD_EXPIRY ) return UploadInitResponse( status="upload_required", presigned_url=presigned_url, deduplicated=False ) @app.post("/upload/complete/{content_hash}") async def upload_complete( content_hash: str, user_id: str = Query(...), filename: str = Query(...), password: Optional[str] = Query(None), background_tasks: BackgroundTasks = None ): """ Confirmar upload completado. Encola verificación. """ async with db_pool.acquire() as conn: blob = await conn.fetchrow(""" SELECT content_hash, storage_path FROM storage.physical_blobs WHERE content_hash = $1 """, content_hash) if not blob: raise HTTPException(404, "Blob not found") # Encolar verificación en background # En producción esto iría a una cola (Redis, RabbitMQ, etc.) background_tasks.add_task( verify_and_finalize, content_hash, blob["storage_path"], user_id, filename, password ) return {"status": "processing", "content_hash": content_hash} async def verify_and_finalize( content_hash: str, storage_path: str, user_id: str, filename: str, password: Optional[str] ): """Background task para verificar y finalizar upload""" from storage_worker import StorageWorker worker = StorageWorker() await worker.init() try: result = await worker.process_upload( content_hash, storage_path, user_id, filename, ph.hash(password) if password else None ) # En producción: notificar cliente via webhook/websocket print(f"Upload finalized: {result}") finally: await worker.close() # ========================================================================= # DOWNLOAD ENDPOINTS # ========================================================================= @app.get("/file/{public_key}") async def download_file( public_key: str, request: Request, password: Optional[str] = Query(None) ): """ Descarga de archivo. Devuelve redirect a URL firmada. """ client_ip = request.client.host # Rate limiting if not check_rate_limit_ip(client_ip): raise HTTPException(429, "Rate limit exceeded - IP") if not check_rate_limit_key(public_key): raise HTTPException(429, "Rate limit exceeded - downloads") async with db_pool.acquire() as conn: # Buscar asset asset = await conn.fetchrow(""" SELECT a.id, a.blob_hash, a.original_filename, a.access_password, a.downloads_count, b.storage_provider, b.storage_path, b.verification_status, b.mime_type FROM storage.user_assets a JOIN storage.physical_blobs b ON a.blob_hash = b.content_hash WHERE a.public_key = $1 """, public_key) if not asset: raise HTTPException(404, "Asset not found") # Verificar contraseña si requerida if asset["access_password"]: if not password: raise HTTPException(401, "Password required") try: ph.verify(asset["access_password"], password) except: raise HTTPException(401, "Invalid password") # Verificar estado del blob status = asset["verification_status"] if status == "PENDING": raise HTTPException(202, "File is being processed") if status in ("CORRUPT", "LOST"): raise HTTPException(410, "File is no longer available") # Incrementar contador de descargas await conn.execute(""" UPDATE storage.user_assets SET downloads_count = downloads_count + 1 WHERE id = $1 """, asset["id"]) # Generar URL firmada según provider provider = asset["storage_provider"] if provider in ("R2_PRIMARY", "R2_CACHE"): presigned_url = s3_client.generate_presigned_url( "get_object", Params={ "Bucket": R2_BUCKET, "Key": asset["storage_path"], "ResponseContentDisposition": f'attachment; filename="{asset["original_filename"]}"', "ResponseContentType": asset["mime_type"] }, ExpiresIn=PRESIGNED_DOWNLOAD_EXPIRY ) return RedirectResponse(presigned_url, status_code=302) elif provider == "SHAREPOINT": # TODO: Implementar acceso SharePoint via Graph API raise HTTPException(503, "SharePoint access not implemented") else: raise HTTPException(503, "Unknown storage provider") @app.get("/file/{public_key}/info") async def file_info(public_key: str, request: Request): """ Información del archivo sin descargarlo. """ client_ip = request.client.host if not check_rate_limit_ip(client_ip): raise HTTPException(429, "Rate limit exceeded") async with db_pool.acquire() as conn: asset = await conn.fetchrow(""" SELECT a.public_key, a.original_filename, a.downloads_count, a.created_at, b.file_size, b.mime_type, b.verification_status, (a.access_password IS NOT NULL) as password_protected FROM storage.user_assets a JOIN storage.physical_blobs b ON a.blob_hash = b.content_hash WHERE a.public_key = $1 """, public_key) if not asset: raise HTTPException(404, "Asset not found") return { "public_key": asset["public_key"], "filename": asset["original_filename"], "size": asset["file_size"], "mime_type": asset["mime_type"], "status": asset["verification_status"], "downloads": asset["downloads_count"], "password_protected": asset["password_protected"], "created_at": asset["created_at"].isoformat() } @app.get("/file/{public_key}/thumb") async def file_thumbnail(public_key: str, request: Request): """ Redirect al thumbnail del archivo. """ client_ip = request.client.host if not check_rate_limit_ip(client_ip): raise HTTPException(429, "Rate limit exceeded") async with db_pool.acquire() as conn: asset = await conn.fetchrow(""" SELECT a.blob_hash, b.verification_status FROM storage.user_assets a JOIN storage.physical_blobs b ON a.blob_hash = b.content_hash WHERE a.public_key = $1 """, public_key) if not asset: raise HTTPException(404, "Asset not found") if asset["verification_status"] != "VERIFIED": raise HTTPException(202, "Thumbnail not ready") # URL al thumbnail thumb_key = f"{asset['blob_hash']}.thumb" try: # Verificar que existe s3_client.head_object(Bucket=R2_BUCKET, Key=thumb_key) except: raise HTTPException(404, "Thumbnail not available") presigned_url = s3_client.generate_presigned_url( "get_object", Params={"Bucket": R2_BUCKET, "Key": thumb_key}, ExpiresIn=PRESIGNED_DOWNLOAD_EXPIRY ) return RedirectResponse(presigned_url, status_code=302) # ========================================================================= # HEALTH # ========================================================================= @app.get("/health") async def health(): return {"status": "ok", "timestamp": datetime.now().isoformat()} # ========================================================================= # MAIN # ========================================================================= if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8080)