#!/usr/bin/env python3 """ MARGARET - Log de entrada CORP Servicio inmutable para recibir datos de PACKET """ import os import hashlib import json from datetime import datetime from flask import Flask, request, jsonify import psycopg2 from psycopg2.extras import Json import boto3 from botocore.client import Config app = Flask(__name__) # Configuración desde variables de entorno H_INSTANCIA = os.getenv('H_INSTANCIA') DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = os.getenv('DB_PORT', '5432') DB_NAME = os.getenv('DB_NAME', 'corp') DB_USER = os.getenv('DB_USER', 'postgres') DB_PASSWORD = os.getenv('DB_PASSWORD') # Cloudflare R2 R2_ENDPOINT = os.getenv('R2_ENDPOINT') R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY') R2_SECRET_KEY = os.getenv('R2_SECRET_KEY') R2_BUCKET = os.getenv('R2_BUCKET', 'corp') # Cliente S3 (compatible con R2) s3_client = boto3.client( 's3', endpoint_url=R2_ENDPOINT, aws_access_key_id=R2_ACCESS_KEY, aws_secret_access_key=R2_SECRET_KEY, config=Config(signature_version='s3v4'), region_name='auto' ) def get_db_connection(): """Obtener conexión a PostgreSQL""" return psycopg2.connect( host=DB_HOST, port=DB_PORT, database=DB_NAME, user=DB_USER, password=DB_PASSWORD ) def verify_auth(request): """Verificar autenticación mediante X-Auth-Key""" auth_key = request.headers.get('X-Auth-Key') if not auth_key or auth_key != H_INSTANCIA: return False return True def upload_to_r2(file_data, file_path): """Subir archivo a R2""" try: s3_client.put_object( Bucket=R2_BUCKET, Key=file_path, Body=file_data ) return True except Exception as e: app.logger.error(f"Error subiendo a R2: {e}") return False @app.route('/health', methods=['GET']) def health(): """Endpoint de salud""" return jsonify({ "service": "margaret", "status": "ok", "timestamp": datetime.utcnow().isoformat() }) @app.route('/ingest', methods=['POST']) def ingest(): """ Endpoint principal para recibir contenedores de PACKET """ if not verify_auth(request): return jsonify({"error": "unauthorized"}), 401 try: contenedor = request.get_json() if not contenedor.get('id'): return jsonify({"error": "missing_id"}), 400 if not contenedor.get('archivo_hash'): return jsonify({"error": "missing_archivo_hash"}), 400 h_entrada = contenedor['archivo_hash'] conn = get_db_connection() cur = conn.cursor() cur.execute( "SELECT id FROM margaret_log WHERE h_entrada = %s", (h_entrada,) ) if cur.fetchone(): cur.close() conn.close() return jsonify({"error": "hash_exists"}), 409 r2_paths = {} if 'archivos' in contenedor: for idx, archivo in enumerate(contenedor['archivos']): if 'data' in archivo: file_data = archivo['data'] file_name = archivo.get('nombre', f'archivo_{idx}') r2_path = f"{H_INSTANCIA}/{h_entrada}/{file_name}" if upload_to_r2(file_data, r2_path): r2_paths[file_name] = r2_path else: cur.close() conn.close() return jsonify({"error": "r2_upload_failed"}), 500 cur.execute( """ INSERT INTO margaret_log (h_instancia, h_entrada, contenedor, r2_paths) VALUES (%s, %s, %s, %s) RETURNING id """, (H_INSTANCIA, h_entrada, Json(contenedor), Json(r2_paths)) ) record_id = cur.fetchone()[0] conn.commit() cur.close() conn.close() app.logger.info(f"Contenedor registrado: {record_id} - {h_entrada}") return jsonify({ "ok": True, "id": record_id, "h_entrada": h_entrada }), 200 except Exception as e: app.logger.error(f"Error en /ingest: {e}") return jsonify({"error": "internal_error", "detail": str(e)}), 500 @app.route('/query/', methods=['GET']) def query(h_entrada): if not verify_auth(request): return jsonify({"error": "unauthorized"}), 401 try: conn = get_db_connection() cur = conn.cursor() cur.execute( """ SELECT id, h_entrada, contenedor, r2_paths, created_at FROM margaret_log WHERE h_entrada = %s AND h_instancia = %s """, (h_entrada, H_INSTANCIA) ) row = cur.fetchone() cur.close() conn.close() if not row: return jsonify({"error": "not_found"}), 404 return jsonify({ "id": row[0], "h_entrada": row[1], "contenedor": row[2], "r2_paths": row[3], "created_at": row[4].isoformat() }), 200 except Exception as e: app.logger.error(f"Error en /query: {e}") return jsonify({"error": "internal_error"}), 500 @app.route('/list', methods=['GET']) def list_entries(): if not verify_auth(request): return jsonify({"error": "unauthorized"}), 401 try: limit = int(request.args.get('limit', 50)) offset = int(request.args.get('offset', 0)) conn = get_db_connection() cur = conn.cursor() cur.execute( """ SELECT id, h_entrada, created_at FROM margaret_log WHERE h_instancia = %s ORDER BY id DESC LIMIT %s OFFSET %s """, (H_INSTANCIA, limit, offset) ) rows = cur.fetchall() cur.execute( "SELECT COUNT(*) FROM margaret_log WHERE h_instancia = %s", (H_INSTANCIA,) ) total = cur.fetchone()[0] cur.close() conn.close() return jsonify({ "total": total, "limit": limit, "offset": offset, "entries": [ { "id": row[0], "h_entrada": row[1], "created_at": row[2].isoformat() } for row in rows ] }), 200 except Exception as e: app.logger.error(f"Error en /list: {e}") return jsonify({"error": "internal_error"}), 500 if __name__ == '__main__': port = int(os.getenv('PORT', 5051)) app.run(host='0.0.0.0', port=port, debug=False)