""" FELDMAN v3.0 Unificado - El Contable Validador + Encadenamiento + Merkle Tree """ from flask import Flask, request, jsonify import psycopg2 from psycopg2.extras import RealDictCursor import hashlib import json import os from datetime import datetime, timedelta from functools import wraps import requests app = Flask(__name__) H_INSTANCIA = os.getenv('H_INSTANCIA') DB_CONFIG = { 'host': os.getenv('DB_HOST', '172.17.0.1'), 'port': os.getenv('DB_PORT', '5432'), 'dbname': os.getenv('DB_NAME', 'corp'), 'user': os.getenv('DB_USER', 'corp'), 'password': os.getenv('DB_PASSWORD', 'corp') } TIPOS_MILESTONE = ['documento', 'hito', 'contrato', 'estado', 'decision'] TIPOS_BLOQUE = ['trabajo', 'verificacion', 'entrega', 'medicion', 'firma'] TIPOS_EVIDENCIA = ['image/jpeg', 'image/png', 'audio/mp3', 'audio/wav', 'video/mp4', 'application/pdf'] CONSOLIDATION_HOURS = 24 def get_db(): return psycopg2.connect(**DB_CONFIG, cursor_factory=RealDictCursor) def require_auth(f): @wraps(f) def decorated(*args, **kwargs): if request.headers.get('X-Auth-Key') != H_INSTANCIA: return jsonify({'error': 'No autorizado'}), 401 return f(*args, **kwargs) return decorated # ═══════════════════════════════════════════════════════════════ # FUNCIONES HASH # ═══════════════════════════════════════════════════════════════ def sha256(data): if isinstance(data, dict): data = json.dumps(data, sort_keys=True, ensure_ascii=False) return hashlib.sha256(data.encode('utf-8')).hexdigest() def calcular_hash_contenido(datos): return sha256(datos) def calcular_hash_registro(hash_previo, hash_contenido): return sha256(f'{hash_previo}:{hash_contenido}') def calcular_leaf_hash(h_registro, hash_contenido, created_at): datos = {'h': h_registro, 'hash_contenido': hash_contenido, 'ts': str(created_at)} return sha256(datos) # ═══════════════════════════════════════════════════════════════ # MERKLE TREE # ═══════════════════════════════════════════════════════════════ def build_merkle_tree(leaves): if not leaves: return {'root': None, 'tree': [], 'leaves': []} leaves = list(leaves) if len(leaves) % 2 == 1: leaves.append(leaves[-1]) tree = [leaves] current = leaves while len(current) > 1: next_level = [] for i in range(0, len(current), 2): left = current[i] right = current[i + 1] if i + 1 < len(current) else left next_level.append(sha256(left + right)) tree.append(next_level) current = next_level return {'root': current[0] if current else None, 'tree': tree, 'leaves': leaves} def get_merkle_proof(tree_data, leaf_index): proof = [] index = leaf_index tree = tree_data['tree'] for level in tree[:-1]: sibling = index + 1 if index % 2 == 0 else index - 1 if sibling < len(level): proof.append({'hash': level[sibling], 'position': 'right' if index % 2 == 0 else 'left'}) index //= 2 return proof def verify_merkle_proof(leaf_hash, proof, root): current = leaf_hash for step in proof: if step['position'] == 'right': current = sha256(current + step['hash']) else: current = sha256(step['hash'] + current) return current == root # ═══════════════════════════════════════════════════════════════ # VALIDACIÓN # ═══════════════════════════════════════════════════════════════ def validar_milestone(datos): errores, reglas = [], [] ok = bool(datos.get('alias')) reglas.append({'codigo': 'M-001', 'ok': ok}) if not ok: errores.append('M-001: Alias requerido') ok = datos.get('tipo_item') in TIPOS_MILESTONE reglas.append({'codigo': 'M-002', 'ok': ok}) if not ok: errores.append(f'M-002: tipo_item invalido') ok = bool(datos.get('proyecto_tag')) reglas.append({'codigo': 'M-003', 'ok': ok}) if not ok: errores.append('M-003: proyecto_tag requerido') return (len(errores) == 0, errores, reglas) def validar_bloque(datos): errores, reglas = [], [] ok = bool(datos.get('alias')) reglas.append({'codigo': 'B-001', 'ok': ok}) if not ok: errores.append('B-001: Alias requerido') ok = datos.get('tipo_accion') in TIPOS_BLOQUE reglas.append({'codigo': 'B-002', 'ok': ok}) if not ok: errores.append('B-002: tipo_accion invalido') ok = bool(datos.get('proyecto_tag')) reglas.append({'codigo': 'B-003', 'ok': ok}) if not ok: errores.append('B-003: proyecto_tag requerido') ev_hash = datos.get('evidencia_hash', '') ok = len(ev_hash) == 64 reglas.append({'codigo': 'B-004', 'ok': ok}) if not ok: errores.append('B-004: evidencia_hash SHA256 requerido') ev_url = datos.get('evidencia_url', '') ok = ev_url.startswith('https://') reglas.append({'codigo': 'B-005', 'ok': ok}) if not ok: errores.append('B-005: evidencia_url HTTPS requerido') ok = datos.get('evidencia_tipo') in TIPOS_EVIDENCIA reglas.append({'codigo': 'B-006', 'ok': ok}) if not ok: errores.append('B-006: evidencia_tipo invalido') existe = False if ev_url.startswith('https://'): try: existe = requests.head(ev_url, timeout=5).status_code == 200 except: pass reglas.append({'codigo': 'B-007', 'ok': existe}) if not existe: errores.append('B-007: evidencia no accesible') return (len(errores) == 0, errores, reglas) # ═══════════════════════════════════════════════════════════════ # CREACIÓN DE REGISTROS # ═══════════════════════════════════════════════════════════════ def crear_milestone(cur, datos): cur.execute('SELECT get_ultimo_hash_milestone(%s)', (H_INSTANCIA,)) hash_previo = cur.fetchone()['get_ultimo_hash_milestone'] cur.execute('SELECT get_siguiente_secuencia_milestone(%s)', (H_INSTANCIA,)) secuencia = cur.fetchone()['get_siguiente_secuencia_milestone'] hash_contenido = calcular_hash_contenido(datos) h_milestone = calcular_hash_registro(hash_previo, hash_contenido) leaf_hash = calcular_leaf_hash(h_milestone, hash_contenido, datetime.utcnow()) cur.execute(''' INSERT INTO milestones ( h_milestone, h_instancia, secuencia, hash_previo, hash_contenido, alias, tipo_item, descripcion, datos, etiqueta_principal, proyecto_tag, id_padre_milestone, merkle_leaf_hash, blockchain_pending ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,TRUE) ''', ( h_milestone, H_INSTANCIA, secuencia, hash_previo, hash_contenido, datos.get('alias'), datos.get('tipo_item'), datos.get('descripcion'), json.dumps(datos), datos.get('etiqueta_principal'), datos.get('proyecto_tag'), datos.get('id_padre_milestone'), leaf_hash )) return h_milestone, leaf_hash def crear_bloque(cur, datos): cur.execute('SELECT get_ultimo_hash_bloque(%s)', (H_INSTANCIA,)) hash_previo = cur.fetchone()['get_ultimo_hash_bloque'] cur.execute('SELECT get_siguiente_secuencia_bloque(%s)', (H_INSTANCIA,)) secuencia = cur.fetchone()['get_siguiente_secuencia_bloque'] hash_contenido = calcular_hash_contenido(datos) h_bloque = calcular_hash_registro(hash_previo, hash_contenido) leaf_hash = calcular_leaf_hash(h_bloque, hash_contenido, datetime.utcnow()) cur.execute(''' INSERT INTO bloques ( h_bloque, h_instancia, secuencia, hash_previo, hash_contenido, alias, tipo_accion, descripcion, datos, evidencia_hash, evidencia_url, evidencia_tipo, etiqueta_principal, proyecto_tag, id_padre_bloque, id_milestone_asociado, merkle_leaf_hash, blockchain_pending ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,TRUE) ''', ( h_bloque, H_INSTANCIA, secuencia, hash_previo, hash_contenido, datos.get('alias'), datos.get('tipo_accion'), datos.get('descripcion'), json.dumps(datos), datos.get('evidencia_hash'), datos.get('evidencia_url'), datos.get('evidencia_tipo'), datos.get('etiqueta_principal'), datos.get('proyecto_tag'), datos.get('id_padre_bloque'), datos.get('id_milestone_asociado'), leaf_hash )) return h_bloque, leaf_hash # ═══════════════════════════════════════════════════════════════ # ENDPOINTS # ═══════════════════════════════════════════════════════════════ @app.route('/health') def health(): try: conn = get_db() conn.cursor().execute('SELECT 1') conn.close() return jsonify({'service': 'feldman', 'status': 'healthy', 'version': '3.0-unified', 'rol': 'contable-validador-merkle'}) except Exception as e: return jsonify({'status': 'unhealthy', 'error': str(e)}), 500 @app.route('/s-contract') def s_contract(): return jsonify({ 'service': 'feldman', 'version': '3.0-unified', 'contract_version': 'S-CONTRACT v2.1', 'rol': 'El Contable - Validador + Merkle Tree', 'endpoints': ['/validar', '/verify/{h}', '/consolidar', '/stats', '/batches', '/milestones', '/bloques'] }) @app.route('/validar', methods=['POST']) @require_auth def validar(): data = request.json origen, h_origen = data.get('origen'), data.get('h_origen') tipo_destino, datos = data.get('tipo_destino'), data.get('datos', {}) if tipo_destino not in ['milestone', 'bloque']: return jsonify({'error': 'tipo_destino debe ser milestone o bloque'}), 400 h_entrada = sha256({'origen': origen, 'datos': datos, 'ts': datetime.utcnow().isoformat()}) conn = get_db() cur = conn.cursor() try: # Insertar en cola cur.execute(''' INSERT INTO feldman_cola (h_entrada, h_instancia, origen, h_origen, tipo_destino, datos, estado) VALUES (%s,%s,%s,%s,%s,%s,'pendiente') ON CONFLICT (h_entrada) DO NOTHING ''', (h_entrada, H_INSTANCIA, origen, h_origen, tipo_destino, json.dumps(datos))) # Validar if tipo_destino == 'milestone': ok, errores, reglas = validar_milestone(datos) else: ok, errores, reglas = validar_bloque(datos) if not ok: cur.execute("UPDATE feldman_cola SET estado='error', error_mensaje=%s WHERE h_entrada=%s", ('; '.join(errores), h_entrada)) conn.commit() return jsonify({'ok': False, 'h_entrada': h_entrada, 'estado': 'error', 'errores': errores}) # Crear registro if tipo_destino == 'milestone': h_registro, leaf_hash = crear_milestone(cur, datos) else: h_registro, leaf_hash = crear_bloque(cur, datos) cur.execute(''' UPDATE feldman_cola SET estado='validado', h_registro=%s, validacion_ok=TRUE, reglas_aplicadas=%s, validated_at=NOW() WHERE h_entrada=%s ''', (h_registro, json.dumps(reglas), h_entrada)) conn.commit() return jsonify({ 'ok': True, 'h_entrada': h_entrada, 'estado': 'validado', 'tipo_registro': tipo_destino, 'h_registro': h_registro, 'merkle_leaf_hash': leaf_hash, 'mensaje': f'{tipo_destino.capitalize()} creado, pendiente consolidacion Merkle' }) except Exception as e: conn.rollback() return jsonify({'error': str(e)}), 500 finally: cur.close() conn.close() @app.route('/consolidar', methods=['POST']) @require_auth def consolidar(): data = request.json or {} forzar = data.get('forzar', False) conn = get_db() cur = conn.cursor() try: # Obtener registros validados (24h o forzado) if forzar: cur.execute("SELECT * FROM feldman_cola WHERE h_instancia=%s AND estado='validado'", (H_INSTANCIA,)) else: cur.execute(''' SELECT * FROM feldman_cola WHERE h_instancia=%s AND estado='validado' AND validated_at < NOW() - INTERVAL '%s hours' ''', (H_INSTANCIA, CONSOLIDATION_HOURS)) registros = cur.fetchall() if not registros: return jsonify({'ok': True, 'mensaje': 'No hay registros para consolidar', 'registros': 0}) # Recolectar leaf hashes leaves = [] leaf_map = {} for r in registros: if r['tipo_destino'] == 'milestone': cur.execute('SELECT merkle_leaf_hash FROM milestones WHERE h_milestone=%s', (r['h_registro'],)) else: cur.execute('SELECT merkle_leaf_hash FROM bloques WHERE h_bloque=%s', (r['h_registro'],)) row = cur.fetchone() if row and row['merkle_leaf_hash']: leaves.append(row['merkle_leaf_hash']) leaf_map[row['merkle_leaf_hash']] = r if not leaves: return jsonify({'ok': False, 'error': 'No hay leaf hashes'}) # Construir Merkle tree tree = build_merkle_tree(leaves) batch_id = f"batch-{datetime.utcnow().strftime('%Y%m%d-%H%M%S')}" # Contar por tipo milestones_count = sum(1 for r in registros if r['tipo_destino'] == 'milestone') bloques_count = len(registros) - milestones_count # Insertar batch cur.execute(''' INSERT INTO merkle_batches (batch_id, h_instancia, periodo_inicio, periodo_fin, total_milestones, total_bloques, merkle_root, merkle_tree) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) ''', ( batch_id, H_INSTANCIA, min(r['validated_at'] for r in registros), max(r['validated_at'] for r in registros), milestones_count, bloques_count, tree['root'], json.dumps(tree) )) # Actualizar registros for r in registros: if r['tipo_destino'] == 'milestone': cur.execute('UPDATE milestones SET merkle_batch_id=%s WHERE h_milestone=%s', (batch_id, r['h_registro'])) else: cur.execute('UPDATE bloques SET merkle_batch_id=%s WHERE h_bloque=%s', (batch_id, r['h_registro'])) cur.execute('UPDATE feldman_cola SET estado=%s, consolidated_at=NOW() WHERE h_entrada=%s', ('consolidado', r['h_entrada'])) conn.commit() return jsonify({ 'ok': True, 'batch_id': batch_id, 'registros_consolidados': len(registros), 'milestones': milestones_count, 'bloques': bloques_count, 'merkle_root': tree['root'], 'mensaje': 'Batch creado, pendiente sellado blockchain' }) except Exception as e: conn.rollback() return jsonify({'error': str(e)}), 500 finally: cur.close() conn.close() @app.route('/verify/') @require_auth def verify(h_registro): conn = get_db() cur = conn.cursor() try: # Buscar en milestones cur.execute('SELECT *, %s as tipo FROM milestones WHERE h_milestone=%s', ('milestone', h_registro)) registro = cur.fetchone() if not registro: cur.execute('SELECT *, %s as tipo FROM bloques WHERE h_bloque=%s', ('bloque', h_registro)) registro = cur.fetchone() if not registro: return jsonify({'error': 'Registro no encontrado'}), 404 if not registro.get('merkle_batch_id'): return jsonify({ 'h_registro': h_registro, 'tipo': registro['tipo'], 'verified': False, 'estado': 'pendiente_consolidacion', 'mensaje': 'Registro aun no consolidado en batch Merkle' }) # Obtener batch cur.execute('SELECT * FROM merkle_batches WHERE batch_id=%s', (registro['merkle_batch_id'],)) batch = cur.fetchone() if not batch: return jsonify({'error': 'Batch no encontrado'}), 404 tree = batch['merkle_tree'] if isinstance(batch['merkle_tree'], dict) else json.loads(batch['merkle_tree']) leaf_hash = registro['merkle_leaf_hash'] # Encontrar indice try: leaf_index = tree['leaves'].index(leaf_hash) except ValueError: return jsonify({'error': 'Leaf no encontrado en tree'}), 500 proof = get_merkle_proof(tree, leaf_index) verified = verify_merkle_proof(leaf_hash, proof, tree['root']) return jsonify({ 'h_registro': h_registro, 'tipo': registro['tipo'], 'verified': verified, 'merkle_proof': { 'leaf_hash': leaf_hash, 'proof': proof, 'root': tree['root'], 'position': leaf_index }, 'batch': { 'batch_id': batch['batch_id'], 'periodo': f"{batch['periodo_inicio']} to {batch['periodo_fin']}", 'total_registros': batch['total_milestones'] + batch['total_bloques'] }, 'blockchain': { 'pending': batch['blockchain_pending'], 'tx_ref': batch.get('blockchain_tx_ref'), 'network': batch.get('blockchain_network') } }) finally: cur.close() conn.close() @app.route('/stats') @require_auth def stats(): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT COUNT(*) as c FROM milestones WHERE h_instancia=%s', (H_INSTANCIA,)) milestones = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) as c FROM bloques WHERE h_instancia=%s', (H_INSTANCIA,)) bloques = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM feldman_cola WHERE h_instancia=%s AND estado='validado'", (H_INSTANCIA,)) cola_validada = cur.fetchone()['c'] cur.execute("SELECT COUNT(*) as c FROM feldman_cola WHERE h_instancia=%s AND estado='pendiente'", (H_INSTANCIA,)) cola_pendiente = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) as c FROM merkle_batches WHERE h_instancia=%s', (H_INSTANCIA,)) batches = cur.fetchone()['c'] cur.execute('SELECT COUNT(*) as c FROM merkle_batches WHERE h_instancia=%s AND blockchain_pending=TRUE', (H_INSTANCIA,)) batches_pend = cur.fetchone()['c'] return jsonify({ 'total_milestones': milestones, 'total_bloques': bloques, 'cola_pendiente': cola_pendiente, 'cola_validada': cola_validada, 'batches_sellados': batches - batches_pend, 'batches_pendientes': batches_pend }) finally: cur.close() conn.close() @app.route('/batches') @require_auth def list_batches(): limit = request.args.get('limit', 20, type=int) conn = get_db() cur = conn.cursor() try: cur.execute('SELECT * FROM merkle_batches WHERE h_instancia=%s ORDER BY created_at DESC LIMIT %s', (H_INSTANCIA, limit)) return jsonify({'batches': [dict(b) for b in cur.fetchall()]}) finally: cur.close() conn.close() @app.route('/batch/') @require_auth def get_batch(batch_id): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT * FROM merkle_batches WHERE batch_id=%s', (batch_id,)) b = cur.fetchone() if not b: return jsonify({'error': 'Batch no encontrado'}), 404 return jsonify(dict(b)) finally: cur.close() conn.close() @app.route('/milestones') @require_auth def list_milestones(): proyecto = request.args.get('proyecto') limit = request.args.get('limit', 50, type=int) conn = get_db() cur = conn.cursor() try: if proyecto: cur.execute('SELECT * FROM milestones WHERE h_instancia=%s AND proyecto_tag=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit)) else: cur.execute('SELECT * FROM milestones WHERE h_instancia=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit)) return jsonify({'milestones': [dict(m) for m in cur.fetchall()]}) finally: cur.close() conn.close() @app.route('/bloques') @require_auth def list_bloques(): proyecto = request.args.get('proyecto') limit = request.args.get('limit', 50, type=int) conn = get_db() cur = conn.cursor() try: if proyecto: cur.execute('SELECT * FROM bloques WHERE h_instancia=%s AND proyecto_tag=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, proyecto, limit)) else: cur.execute('SELECT * FROM bloques WHERE h_instancia=%s ORDER BY secuencia DESC LIMIT %s', (H_INSTANCIA, limit)) return jsonify({'bloques': [dict(b) for b in cur.fetchall()]}) finally: cur.close() conn.close() @app.route('/milestone/') @require_auth def get_milestone(h_milestone): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT * FROM milestones WHERE h_milestone=%s', (h_milestone,)) m = cur.fetchone() return jsonify(dict(m)) if m else (jsonify({'error': 'No encontrado'}), 404) finally: cur.close() conn.close() @app.route('/bloque/') @require_auth def get_bloque(h_bloque): conn = get_db() cur = conn.cursor() try: cur.execute('SELECT * FROM bloques WHERE h_bloque=%s', (h_bloque,)) b = cur.fetchone() return jsonify(dict(b)) if b else (jsonify({'error': 'No encontrado'}), 404) finally: cur.close() conn.close() if __name__ == '__main__': app.run(host='0.0.0.0', port=5054, debug=False)