#!/usr/bin/env python3 """ Migración: Importar archivos existentes de secretaria_clara.atc a storage """ import asyncio import asyncpg import os import json DB_URL = os.environ.get("DATABASE_URL", "postgresql://tzzr:tzzr@localhost:5432/tzzr") async def migrate(): pool = await asyncpg.create_pool(DB_URL, min_size=2, max_size=10) async with pool.acquire() as conn: # Obtener archivos de atc que tienen hash y url_file atc_files = await conn.fetch(""" SELECT mrf, private_mrf, alias, name_es, ref, ext, jsonb_standard, hashtags FROM secretaria_clara.atc WHERE jsonb_standard IS NOT NULL AND jsonb_standard->'L2_document'->>'url_file' IS NOT NULL """) print(f"Encontrados {len(atc_files)} archivos en atc") migrated = 0 skipped = 0 errors = 0 for file in atc_files: try: mrf = file["mrf"] jsonb = file["jsonb_standard"] or {} # Extraer datos l2 = jsonb.get("L2_document", {}) url_file = l2.get("url_file") size_bytes = l2.get("size_bytes", 0) mime_type = l2.get("mime_type", "application/octet-stream") if not url_file: skipped += 1 continue # Verificar si ya existe en storage existing = await conn.fetchrow(""" SELECT content_hash FROM storage.physical_blobs WHERE content_hash = $1 """, mrf) if existing: skipped += 1 continue # Insertar en physical_blobs await conn.execute(""" INSERT INTO storage.physical_blobs (content_hash, file_size, mime_type, storage_provider, storage_path, verification_status) VALUES ($1, $2, $3, 'R2_PRIMARY', $4, 'VERIFIED') """, mrf, size_bytes, mime_type, url_file) # Crear user_asset con el mismo public_key que el mrf # Usamos un UUID dummy para user_id ya que no tenemos usuarios await conn.execute(""" INSERT INTO storage.user_assets (public_key, blob_hash, user_id, original_filename) VALUES ($1, $2, '00000000-0000-0000-0000-000000000000'::uuid, $3) ON CONFLICT (public_key) DO NOTHING """, mrf, mrf, file["name_es"] or file["alias"] or mrf[:20]) migrated += 1 if migrated % 100 == 0: print(f" Migrados: {migrated}") except Exception as e: errors += 1 print(f"Error migrando {file['mrf']}: {e}") print(f"\nMigración completada:") print(f" - Migrados: {migrated}") print(f" - Saltados (ya existían o sin datos): {skipped}") print(f" - Errores: {errors}") # Actualizar ref_count await conn.execute(""" UPDATE storage.physical_blobs pb SET ref_count = ( SELECT COUNT(*) FROM storage.user_assets ua WHERE ua.blob_hash = pb.content_hash ) """) print(" - ref_count actualizado") await pool.close() if __name__ == "__main__": asyncio.run(migrate())