Change PIN to 1451

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
ARCHITECT
2026-01-17 23:31:52 +00:00
parent c152cacb90
commit f199daf4ba
171 changed files with 10492 additions and 2 deletions

View File

@@ -0,0 +1,17 @@
[Unit]
Description=Captain Claude Mobile v2 API
After=network.target
[Service]
Type=simple
User=architect
Group=architect
WorkingDirectory=/home/architect/captain-claude/apps/captain-mobile-v2/backend
Environment="PATH=/home/architect/captain-claude/apps/captain-mobile-v2/backend/venv/bin:/home/architect/.npm-global/bin:/usr/local/bin:/usr/bin:/bin"
# Password from /data/.admin_password (default: admin)
ExecStart=/home/architect/captain-claude/apps/captain-mobile-v2/backend/venv/bin/uvicorn captain_api_v2:app --host 0.0.0.0 --port 3030
Restart=always
RestartSec=3
[Install]
WantedBy=multi-user.target

View File

@@ -0,0 +1,15 @@
[Unit]
Description=Captain Claude API v3
After=network.target
[Service]
Type=simple
User=architect
WorkingDirectory=/home/architect/captain-claude/apps/captain-mobile-v2/backend
Environment=PATH=/home/architect/captain-claude/apps/captain-mobile-v2/backend/venv/bin:/usr/local/bin:/usr/bin:/bin
ExecStart=/home/architect/captain-claude/apps/captain-mobile-v2/backend/venv/bin/uvicorn captain_api_v3:app --host 0.0.0.0 --port 3030
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,620 @@
#!/usr/bin/env python3
"""
Captain Claude Mobile v3 - Backend API
Architecture: Uses claude CLI with --output-format stream-json
No more screen/hardcopy bullshit - clean JSON output
"""
import sys
import logging
import os
import asyncio
import secrets
import subprocess
import json
import re
from datetime import datetime, timedelta
from typing import Optional, Dict, Set
from contextlib import asynccontextmanager
from pathlib import Path
from dataclasses import dataclass, field
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import jwt
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler(sys.stderr)]
)
logger = logging.getLogger(__name__)
# ============================================================================
# Configuration
# ============================================================================
DATA_DIR = Path("/home/architect/captain-claude/apps/captain-mobile-v2/data")
CLAUDE_CMD = "/home/architect/.npm-global/bin/claude"
WORKING_DIR = "/home/architect/captain-claude"
JWT_SECRET_FILE = DATA_DIR / ".jwt_secret"
JWT_ALGORITHM = "HS256"
JWT_EXPIRY_DAYS = 7
def get_jwt_secret():
DATA_DIR.mkdir(parents=True, exist_ok=True)
if os.environ.get("JWT_SECRET"):
return os.environ.get("JWT_SECRET")
if JWT_SECRET_FILE.exists():
return JWT_SECRET_FILE.read_text().strip()
secret = secrets.token_hex(32)
JWT_SECRET_FILE.write_text(secret)
JWT_SECRET_FILE.chmod(0o600)
return secret
JWT_SECRET = get_jwt_secret()
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD")
if not ADMIN_PASSWORD:
_pass_file = DATA_DIR / ".admin_password"
DATA_DIR.mkdir(parents=True, exist_ok=True)
if _pass_file.exists():
ADMIN_PASSWORD = _pass_file.read_text().strip()
else:
ADMIN_PASSWORD = "admin"
_pass_file.write_text(ADMIN_PASSWORD)
_pass_file.chmod(0o600)
VALID_USERS = {"admin": ADMIN_PASSWORD}
ALLOWED_ORIGINS = [
"http://localhost:3000",
"http://localhost:8080",
"http://127.0.0.1:3000",
"http://127.0.0.1:8080",
"https://captain.tzzrarchitect.me",
"capacitor://localhost",
"ionic://localhost"
]
security = HTTPBearer(auto_error=False)
# ============================================================================
# Pydantic Models
# ============================================================================
class LoginRequest(BaseModel):
username: str
password: str
class LoginResponse(BaseModel):
token: str
expires_at: str
class SessionInfo(BaseModel):
session_id: str
name: str
created_at: str
class CreateSessionRequest(BaseModel):
name: str
# ============================================================================
# Auth Helpers
# ============================================================================
def create_token(username: str) -> tuple[str, datetime]:
expires = datetime.utcnow() + timedelta(days=JWT_EXPIRY_DAYS)
payload = {
"sub": username,
"exp": expires,
"iat": datetime.utcnow()
}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token, expires
def verify_token(token: str) -> Optional[str]:
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
return payload.get("sub")
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
if not credentials:
raise HTTPException(status_code=401, detail="Not authenticated")
username = verify_token(credentials.credentials)
if not username:
raise HTTPException(status_code=401, detail="Invalid or expired token")
return username
# ============================================================================
# Claude Session Manager
# ============================================================================
@dataclass
class ClaudeSession:
"""Represents a Claude conversation session"""
session_id: str # Claude's session UUID
name: str # User-friendly name
created_at: datetime
subscribers: Set[asyncio.Queue] = field(default_factory=set)
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
is_processing: bool = False
class SessionManager:
"""
Manages Claude CLI sessions using -p mode with stream-json output.
Each session maintains conversation history via Claude's session_id.
"""
def __init__(self):
self._sessions: Dict[str, ClaudeSession] = {}
self._lock = asyncio.Lock()
async def create_session(self, name: str) -> ClaudeSession:
"""Create a new Claude session"""
# Generate initial session by making a simple call
session_id = secrets.token_hex(16) # We'll get real session_id from first call
session = ClaudeSession(
session_id=session_id,
name=name,
created_at=datetime.utcnow()
)
async with self._lock:
self._sessions[session_id] = session
logger.info(f"Created session: {name} ({session_id})")
return session
async def get_session(self, session_id: str) -> Optional[ClaudeSession]:
"""Get a session by ID"""
return self._sessions.get(session_id)
async def list_sessions(self) -> list[SessionInfo]:
"""List all sessions"""
return [
SessionInfo(
session_id=s.session_id,
name=s.name,
created_at=s.created_at.isoformat()
)
for s in self._sessions.values()
]
async def subscribe(self, session_id: str) -> asyncio.Queue:
"""Subscribe to a session's output"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
queue: asyncio.Queue = asyncio.Queue(maxsize=100)
async with session.lock:
session.subscribers.add(queue)
logger.debug(f"Subscribed to session {session_id}, {len(session.subscribers)} subscribers")
return queue
async def unsubscribe(self, session_id: str, queue: asyncio.Queue):
"""Unsubscribe from a session's output"""
session = self._sessions.get(session_id)
if session:
async with session.lock:
session.subscribers.discard(queue)
logger.debug(f"Unsubscribed from session {session_id}")
async def send_message(self, session_id: str, content: str) -> bool:
"""
Send a message to Claude and stream the response.
Uses claude -p with stream-json for clean output.
"""
session = self._sessions.get(session_id)
if not session:
logger.error(f"Session {session_id} not found")
return False
if session.is_processing:
logger.warning(f"Session {session_id} is already processing")
await self._broadcast(session, {"type": "error", "content": "Ya hay un mensaje en proceso"})
return False
session.is_processing = True
try:
# Build command
cmd = [
CLAUDE_CMD,
"-p", content,
"--output-format", "stream-json",
"--verbose",
"--dangerously-skip-permissions"
]
# If we have a Claude session UUID, resume it to maintain conversation
# Claude UUIDs are in format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (36 chars with dashes)
if session.session_id and '-' in session.session_id and len(session.session_id) == 36:
cmd.extend(["--resume", session.session_id])
logger.debug(f"Resuming Claude session: {session.session_id}")
logger.debug(f"Executing: {' '.join(cmd)}")
# Run claude process
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=WORKING_DIR
)
# Process output as it arrives
full_response = ""
tool_count = 0
try:
while True:
# Read line with timeout to avoid hanging
try:
line = await asyncio.wait_for(
process.stdout.readline(),
timeout=60.0
)
except asyncio.TimeoutError:
logger.warning("Timeout reading from Claude process")
break
if not line:
break
line_str = line.decode('utf-8').strip()
if not line_str:
continue
logger.debug(f"Claude output: {line_str[:100]}...")
try:
data = json.loads(line_str)
msg_type = data.get("type", "")
if msg_type == "assistant":
message = data.get("message", {})
content_list = message.get("content", [])
for item in content_list:
if item.get("type") == "text":
text = item.get("text", "")
if text:
full_response = text
# Send immediately!
await self._broadcast(session, {
"type": "output",
"content": text
})
elif item.get("type") == "tool_use":
tool_count += 1
await self._broadcast(session, {
"type": "output",
"content": f"procesando{'.' * tool_count}"
})
elif msg_type == "result":
result = data.get("result", "")
claude_session = data.get("session_id")
if claude_session:
session.session_id = claude_session
# Send done signal
await self._broadcast(session, {
"type": "done",
"session_id": session.session_id
})
except json.JSONDecodeError:
pass
except Exception as e:
logger.error(f"Error processing line: {e}")
finally:
# Ensure process is cleaned up
try:
process.terminate()
await asyncio.wait_for(process.wait(), timeout=5.0)
except:
process.kill()
return True
except Exception as e:
logger.error(f"Error sending message: {e}")
if str(e): # Only broadcast if there's an actual error message
await self._broadcast(session, {
"type": "error",
"content": str(e)
})
return False
finally:
session.is_processing = False
async def _broadcast(self, session: ClaudeSession, message: dict):
"""Broadcast message to all subscribers"""
async with session.lock:
subscribers = list(session.subscribers)
logger.debug(f"Broadcasting to {len(subscribers)} subscribers: {message}")
for queue in subscribers:
try:
queue.put_nowait(message)
logger.debug(f"Message queued successfully")
except asyncio.QueueFull:
try:
queue.get_nowait()
queue.put_nowait(message)
except:
pass
except Exception as e:
logger.error(f"Error queuing message: {e}")
# Global session manager
session_manager = SessionManager()
# ============================================================================
# App Setup
# ============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("Captain Claude v3 starting...")
yield
logger.info("Captain Claude v3 shutting down...")
app = FastAPI(
title="Captain Claude Mobile v3",
description="Chat API using Claude CLI with stream-json",
version="3.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# REST Endpoints
# ============================================================================
@app.get("/health")
async def health():
return {"status": "ok", "version": "3.0.0"}
@app.post("/auth/login", response_model=LoginResponse)
async def login(request: LoginRequest):
if request.username not in VALID_USERS:
raise HTTPException(status_code=401, detail="Invalid credentials")
if VALID_USERS[request.username] != request.password:
raise HTTPException(status_code=401, detail="Invalid credentials")
token, expires = create_token(request.username)
return LoginResponse(token=token, expires_at=expires.isoformat())
@app.get("/sessions")
async def get_sessions(user: str = Depends(get_current_user)):
return await session_manager.list_sessions()
@app.post("/sessions")
async def create_session(request: CreateSessionRequest, user: str = Depends(get_current_user)):
try:
session = await session_manager.create_session(request.name)
return SessionInfo(
session_id=session.session_id,
name=session.name,
created_at=session.created_at.isoformat()
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# WebSocket Chat
# ============================================================================
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
"""WebSocket endpoint for chat"""
await websocket.accept()
# Auth handshake
try:
auth_msg = await asyncio.wait_for(websocket.receive_json(), timeout=10.0)
token = auth_msg.get("token", "")
username = verify_token(token)
if not username:
await websocket.send_json({"type": "error", "message": "Invalid token"})
await websocket.close(code=4001)
return
except asyncio.TimeoutError:
await websocket.send_json({"type": "error", "message": "Auth timeout"})
await websocket.close(code=4001)
return
except Exception as e:
await websocket.send_json({"type": "error", "message": f"Auth error: {str(e)}"})
await websocket.close(code=4001)
return
# Send init
sessions = await session_manager.list_sessions()
await websocket.send_json({
"type": "init",
"user": username,
"sessions": [{"session_id": s.session_id, "name": s.name} for s in sessions]
})
# Current subscription state
current_session_id: Optional[str] = None
current_queue: Optional[asyncio.Queue] = None
output_task: Optional[asyncio.Task] = None
async def output_forwarder(queue: asyncio.Queue):
"""Forward output from queue to websocket"""
while True:
try:
message = await queue.get()
# Message already has type, just forward it
await websocket.send_json(message)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Output forwarder error: {e}")
break
try:
while True:
data = await websocket.receive_json()
msg_type = data.get("type", "")
if msg_type == "create_session":
# Create a new session and auto-connect
name = data.get("name", f"session_{datetime.now().strftime('%H%M%S')}")
try:
session = await session_manager.create_session(name)
await websocket.send_json({
"type": "session_created",
"session_id": session.session_id,
"name": session.name
})
# Auto-connect to the new session
if current_session_id and current_queue:
if output_task:
output_task.cancel()
try:
await output_task
except asyncio.CancelledError:
pass
await session_manager.unsubscribe(current_session_id, current_queue)
current_queue = await session_manager.subscribe(session.session_id)
current_session_id = session.session_id
output_task = asyncio.create_task(output_forwarder(current_queue))
await websocket.send_json({
"type": "session_connected",
"session_id": session.session_id,
"name": session.name
})
logger.info(f"Auto-connected to new session {session.session_id}")
except Exception as e:
await websocket.send_json({"type": "error", "message": str(e)})
elif msg_type == "connect_session":
session_id = data.get("session_id", "")
# Unsubscribe from previous session
if current_session_id and current_queue:
if output_task:
output_task.cancel()
try:
await output_task
except asyncio.CancelledError:
pass
await session_manager.unsubscribe(current_session_id, current_queue)
# Subscribe to new session
try:
current_queue = await session_manager.subscribe(session_id)
current_session_id = session_id
output_task = asyncio.create_task(output_forwarder(current_queue))
session = await session_manager.get_session(session_id)
await websocket.send_json({
"type": "session_connected",
"session_id": session_id,
"name": session.name if session else "unknown"
})
except ValueError as e:
await websocket.send_json({"type": "error", "message": str(e)})
current_session_id = None
current_queue = None
elif msg_type == "message":
content = data.get("content", "")
if current_session_id and content:
# Send message in background so we can receive more commands
asyncio.create_task(
session_manager.send_message(current_session_id, content)
)
elif not current_session_id:
await websocket.send_json({
"type": "error",
"message": "No hay sesión conectada"
})
elif msg_type == "list_sessions":
sessions = await session_manager.list_sessions()
await websocket.send_json({
"type": "sessions_list",
"sessions": [{"session_id": s.session_id, "name": s.name} for s in sessions]
})
elif msg_type == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"WebSocket error: {e}")
try:
await websocket.send_json({"type": "error", "message": str(e)})
except:
pass
finally:
if output_task:
output_task.cancel()
try:
await output_task
except asyncio.CancelledError:
pass
if current_session_id and current_queue:
await session_manager.unsubscribe(current_session_id, current_queue)
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=3030)

View File

@@ -0,0 +1,5 @@
fastapi==0.109.0
uvicorn[standard]==0.27.0
websockets==12.0
pyjwt==2.8.0
python-multipart==0.0.6

View File

@@ -0,0 +1,45 @@
import asyncio
import json
from websockets import connect
import httpx
async def test():
print("=== Test Auto-Connect ===", flush=True)
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
token = r.json()['token']
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
init = json.loads(await ws.recv())
print(f"1. Init: {init.get('type')}, sessions: {len(init.get('sessions', []))}", flush=True)
# Create session
print("2. Creating session...", flush=True)
await ws.send(json.dumps({'type': 'create_session', 'name': 'mi_sesion'}))
# Should receive session_created AND session_connected
for i in range(3):
msg = json.loads(await asyncio.wait_for(ws.recv(), timeout=5))
print(f" Received: {msg.get('type')} - {msg}", flush=True)
if msg.get('type') == 'session_connected':
print("3. AUTO-CONNECT OK!", flush=True)
# Now send a message
print("4. Sending message...", flush=True)
await ws.send(json.dumps({'type': 'message', 'content': 'di hola'}))
for j in range(30):
try:
resp = json.loads(await asyncio.wait_for(ws.recv(), timeout=2))
print(f" Response: {resp}", flush=True)
if resp.get('type') == 'done':
print("\n=== SUCCESS ===", flush=True)
return
except asyncio.TimeoutError:
pass
return
asyncio.run(test())

View File

@@ -0,0 +1,65 @@
import asyncio
import json
from websockets import connect
import httpx
async def test():
print("=== Test Connect Session ===")
# Login
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
token = r.json()['token']
# Connect and create session
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
init = json.loads(await ws.recv())
print(f"1. Init sessions: {len(init.get('sessions', []))}")
for s in init.get('sessions', []):
print(f" - {s['name']} ({s['session_id'][:8]}...)")
# Create a new session
await ws.send(json.dumps({'type': 'create_session', 'name': 'test_session'}))
created = json.loads(await ws.recv())
print(f"2. Created: {created}")
sid = created.get('session_id')
# Should auto-connect - check for session_connected
connected = json.loads(await ws.recv())
print(f"3. Connected msg: {connected}")
print("\n--- Reconnecting ---\n")
# Reconnect
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
init = json.loads(await ws.recv())
print(f"4. Init sessions: {len(init.get('sessions', []))}")
for s in init.get('sessions', []):
print(f" - {s['name']} ({s['session_id'][:8]}...)")
# Try to connect to the session we created
print(f"\n5. Connecting to session {sid[:8]}...")
await ws.send(json.dumps({'type': 'connect_session', 'session_id': sid}))
result = json.loads(await ws.recv())
print(f"6. Result: {result}")
if result.get('type') == 'session_connected':
print("\n=== SUCCESS - Connected to existing session ===")
# Try sending a message
await ws.send(json.dumps({'type': 'message', 'content': 'test'}))
for i in range(30):
try:
m = await asyncio.wait_for(ws.recv(), timeout=1.0)
print(f" RECV: {json.loads(m)}")
if json.loads(m).get('type') == 'done':
break
except asyncio.TimeoutError:
pass
else:
print(f"\n=== FAILED: {result} ===")
asyncio.run(test())

View File

@@ -0,0 +1,35 @@
import asyncio
import json
from websockets import connect
import httpx
async def test():
print("=== Test Connect Session ===", flush=True)
# Login
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
token = r.json()['token']
print(f"Got token", flush=True)
# Connect and list sessions
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
init = json.loads(await ws.recv())
print(f"Init: {init.get('type')}", flush=True)
sessions = init.get('sessions', [])
print(f"Sessions: {len(sessions)}", flush=True)
for s in sessions:
print(f" - {s['name']} ({s['session_id']})", flush=True)
if sessions:
sid = sessions[0]['session_id']
print(f"\nConnecting to: {sid}", flush=True)
await ws.send(json.dumps({'type': 'connect_session', 'session_id': sid}))
result = json.loads(await ws.recv())
print(f"Result: {result}", flush=True)
else:
print("No sessions to connect to", flush=True)
asyncio.run(test())

View File

@@ -0,0 +1,118 @@
#!/usr/bin/env python3
"""
Test final de Captain Mobile - Valida flujo completo
"""
import asyncio
import json
import httpx
from websockets import connect
SESSION = "655551.qa_test"
async def test():
print("=" * 60)
print("TEST FINAL - Captain Mobile")
print(f"Sesión: {SESSION}")
print("=" * 60)
# 1. Login
print("\n[1] Obteniendo token...")
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
if r.status_code != 200:
print(f"ERROR: Login failed {r.status_code}")
return
token = r.json()['token']
print(f" Token OK: {token[:20]}...")
# 2. WebSocket
print("\n[2] Conectando WebSocket...")
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
init = json.loads(await ws.recv())
print(f" Init: {init['type']}")
# 3. Connect to session
print(f"\n[3] Conectando a sesión {SESSION}...")
await ws.send(json.dumps({'type': 'connect_session', 'full_name': SESSION}))
conn = json.loads(await ws.recv())
print(f" Resultado: {conn['type']}")
if conn['type'] != 'session_connected':
print(f" ERROR: No se pudo conectar a la sesión")
return
# 4. Esperar un poco para que se establezca la conexión
print("\n[4] Esperando 2s para estabilizar conexión...")
await asyncio.sleep(2)
# 5. Enviar mensaje simple
mensaje = "responde EXACTAMENTE: TEST_OK_12345"
print(f"\n[5] Enviando mensaje: '{mensaje}'")
await ws.send(json.dumps({'type': 'message', 'content': mensaje}))
# 6. Recibir respuestas
print("\n[6] Esperando respuestas (max 45s)...")
outputs = []
start = asyncio.get_event_loop().time()
try:
while (asyncio.get_event_loop().time() - start) < 45:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=10.0)
data = json.loads(msg)
if data['type'] == 'output':
content = data['content']
outputs.append(content)
# Mostrar preview
preview = content[:80].replace('\n', '\\n')
print(f" Output #{len(outputs)} ({len(content)} chars): {preview}...")
# Buscar la respuesta esperada
if 'TEST_OK_12345' in content:
print(f" *** RESPUESTA ENCONTRADA! ***")
break
except asyncio.TimeoutError:
print(" (timeout 10s sin output)")
if outputs:
break
except Exception as e:
print(f" Error: {e}")
# 7. Validación
full_output = ''.join(outputs)
print("\n" + "=" * 60)
print("RESULTADOS")
print("=" * 60)
print(f"\nOutputs recibidos: {len(outputs)}")
print(f"Total caracteres: {len(full_output)}")
# Check 1: Respuesta encontrada
has_response = 'TEST_OK_12345' in full_output
print(f"\n[CHECK 1] Respuesta 'TEST_OK_12345': {'PASS' if has_response else 'FAIL'}")
# Check 2: Sin duplicados excesivos
no_duplicates = len(full_output) < 2000 # Respuesta simple debería ser corta
print(f"[CHECK 2] Sin duplicados excesivos (<2000 chars): {'PASS' if no_duplicates else 'FAIL'}")
# Check 3: UI filtrada
has_ui = 'bypass permissions' in full_output or '───' in full_output
no_ui = not has_ui
print(f"[CHECK 3] UI de Claude filtrada: {'PASS' if no_ui else 'FAIL'}")
# Resultado final
all_pass = has_response and no_duplicates and no_ui
print("\n" + "=" * 60)
print(f"RESULTADO FINAL: {'PASS' if all_pass else 'FAIL'}")
print("=" * 60)
if len(full_output) < 500:
print(f"\nRespuesta completa:\n{full_output}")
else:
print(f"\nRespuesta (primeros 500 chars):\n{full_output[:500]}...")
if __name__ == '__main__':
asyncio.run(test())

View File

@@ -0,0 +1,40 @@
import asyncio
import json
from websockets import connect
import httpx
async def test():
print("=== Test Simple ===")
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
token = r.json()['token']
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
await ws.recv()
await ws.send(json.dumps({'type': 'create_session', 'name': 's'}))
created = json.loads(await ws.recv())
sid = created['session_id']
await ws.send(json.dumps({'type': 'connect_session', 'session_id': sid}))
await ws.recv()
print("Enviando: di hola")
await ws.send(json.dumps({'type': 'message', 'content': 'di hola'}))
print("Esperando respuesta (60s max)...")
for i in range(60):
try:
m = await asyncio.wait_for(ws.recv(), timeout=1.0)
data = json.loads(m)
print(f" RECIBIDO: {data}")
if data.get('type') == 'done':
print("\n=== COMPLETADO ===")
break
except asyncio.TimeoutError:
print(f" {i+1}s...", end="\r")
asyncio.run(test())

View File

@@ -0,0 +1,40 @@
import asyncio
import json
from websockets import connect
import httpx
async def test():
print("=== Test v3 Simple ===")
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
token = r.json()['token']
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
await ws.recv() # init
await ws.send(json.dumps({'type': 'create_session', 'name': 't'}))
created = json.loads(await ws.recv())
sid = created['session_id']
await ws.send(json.dumps({'type': 'connect_session', 'session_id': sid}))
await ws.recv() # connected
print("Enviando mensaje...")
await ws.send(json.dumps({'type': 'message', 'content': 'di OK'}))
# Esperar respuesta con mucho timeout
print("Esperando (30s)...")
try:
for i in range(30):
try:
msg = await asyncio.wait_for(ws.recv(), timeout=1.0)
print(f"RECIBIDO: {msg}")
except asyncio.TimeoutError:
print(f" {i+1}s...", end="\r")
except Exception as e:
print(f"Error: {e}")
asyncio.run(test())

View File

@@ -0,0 +1,46 @@
import asyncio
import json
from websockets import connect
import httpx
async def test():
print("=== Test v3 Conversación ===\n")
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
token = r.json()['token']
async with connect('ws://localhost:3030/ws/chat') as ws:
await ws.send(json.dumps({'token': token}))
await ws.recv()
await ws.send(json.dumps({'type': 'create_session', 'name': 'conversacion'}))
created = json.loads(await ws.recv())
sid = created['session_id']
await ws.send(json.dumps({'type': 'connect_session', 'session_id': sid}))
await ws.recv()
async def send_and_wait(msg):
print(f"YO: {msg}")
await ws.send(json.dumps({'type': 'message', 'content': msg}))
response = ""
for _ in range(20):
try:
m = await asyncio.wait_for(ws.recv(), timeout=1.0)
data = json.loads(m)
if data.get('type') == 'output':
response = data.get('content', '')
if data.get('type') == 'done':
break
except asyncio.TimeoutError:
pass
print(f"CLAUDE: {response}\n")
return response
await send_and_wait("mi nombre es Pablo")
await send_and_wait("cuál es mi nombre?")
await send_and_wait("di hola")
asyncio.run(test())

View File

@@ -0,0 +1,128 @@
#!/usr/bin/env python3
"""
Test WebSocket - Simula comportamiento de app Flutter
Valida conexión, autenticación, mensajes y respuestas
"""
import asyncio
import json
import httpx
from websockets import connect
async def test():
print("=" * 60)
print("TEST WEBSOCKET - Simulador App Flutter")
print("=" * 60)
# 1. Login
print("\n[1] Obteniendo token de autenticación...")
async with httpx.AsyncClient() as client:
r = await client.post('http://localhost:3030/auth/login',
json={'username': 'admin', 'password': 'admin'})
if r.status_code != 200:
print(f"ERROR: Login failed with status {r.status_code}")
return
token = r.json()['token']
print(f" Token obtenido: {token[:20]}...")
# 2. WebSocket
print("\n[2] Conectando a WebSocket...")
async with connect('ws://localhost:3030/ws/chat') as ws:
# Auth
await ws.send(json.dumps({'token': token}))
init = json.loads(await ws.recv())
print(f" Init response: {init['type']}")
# Connect to session
print("\n[3] Conectando a sesión 648211.captain_test...")
await ws.send(json.dumps({'type': 'connect_session', 'full_name': '648211.captain_test'}))
conn = json.loads(await ws.recv())
print(f" Session response: {conn}")
# Send message
print("\n[4] Enviando mensaje: 'di solo: OK'")
await ws.send(json.dumps({'type': 'message', 'content': 'di solo: OK'}))
# Receive responses - longer timeout for Claude response
print("\n[5] Esperando respuestas (timeout 30s)...")
outputs = []
output_count = 0
start_time = asyncio.get_event_loop().time()
max_wait = 30.0 # Max 30 seconds total
idle_timeout = 8.0 # 8 seconds without output = done
try:
while (asyncio.get_event_loop().time() - start_time) < max_wait:
try:
msg = await asyncio.wait_for(ws.recv(), timeout=idle_timeout)
data = json.loads(msg)
if data['type'] == 'output':
output_count += 1
content = data['content']
outputs.append(content)
preview = content[:100].replace('\n', '\\n')
print(f" Output #{output_count} ({len(content)} chars): {preview}...")
elif data['type'] == 'status':
print(f" Status: {data.get('status', data)}")
elif data['type'] == 'error':
print(f" ERROR: {data.get('message', data)}")
except asyncio.TimeoutError:
print(" (idle timeout - fin de respuestas)")
break
except Exception as e:
print(f" Error: {e}")
# Validate
full_output = ''.join(outputs)
print("\n" + "=" * 60)
print("VALIDACION DE RESULTADOS")
print("=" * 60)
# Validación 1: Contiene OK
contains_ok = 'OK' in full_output or 'ok' in full_output.lower()
print(f"\n[CHECK 1] Contiene 'OK': {'PASS' if contains_ok else 'FAIL'}")
# Validación 2: No duplicados
# Detectar si hay contenido repetido
has_duplicates = False
if len(outputs) > 1:
# Verificar si chunks consecutivos son idénticos
for i in range(len(outputs) - 1):
if outputs[i] == outputs[i+1] and len(outputs[i]) > 10:
has_duplicates = True
break
# Verificar si el contenido total tiene patrones repetidos
if len(full_output) > 100:
half = len(full_output) // 2
first_half = full_output[:half]
second_half = full_output[half:half*2]
if first_half == second_half:
has_duplicates = True
print(f"[CHECK 2] Sin duplicados: {'PASS' if not has_duplicates else 'FAIL'}")
if has_duplicates:
print(" WARNING: Se detectó contenido duplicado!")
# Validación 3: Longitud razonable
is_short = len(full_output) <= 500
print(f"[CHECK 3] Longitud <= 500 chars: {'PASS' if is_short else 'FAIL'}")
print(f" Longitud actual: {len(full_output)} caracteres")
# Resumen
all_passed = contains_ok and not has_duplicates and is_short
print("\n" + "=" * 60)
if all_passed:
print("RESULTADO FINAL: PASS - Todas las validaciones correctas")
else:
print("RESULTADO FINAL: FAIL - Hay validaciones fallidas")
print("=" * 60)
# Mostrar respuesta completa si es corta
if len(full_output) <= 500:
print(f"\nRespuesta completa:\n{full_output}")
else:
print(f"\nRespuesta (primeros 500 chars):\n{full_output[:500]}...")
print(f"\n... (truncado, total: {len(full_output)} chars)")
if __name__ == '__main__':
asyncio.run(test())