#!/usr/bin/env python3 """ Captain Claude Mobile v3 - Backend API Architecture: Uses claude CLI with --output-format stream-json No more screen/hardcopy bullshit - clean JSON output """ import sys import logging import os import asyncio import secrets import subprocess import json import re from datetime import datetime, timedelta from typing import Optional, Dict, Set from contextlib import asynccontextmanager from pathlib import Path from dataclasses import dataclass, field from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel import jwt # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler(sys.stderr)] ) logger = logging.getLogger(__name__) # ============================================================================ # Configuration # ============================================================================ DATA_DIR = Path("/home/architect/captain-claude/apps/captain-mobile-v2/data") CLAUDE_CMD = "/home/architect/.npm-global/bin/claude" WORKING_DIR = "/home/architect/captain-claude" JWT_SECRET_FILE = DATA_DIR / ".jwt_secret" JWT_ALGORITHM = "HS256" JWT_EXPIRY_DAYS = 7 def get_jwt_secret(): DATA_DIR.mkdir(parents=True, exist_ok=True) if os.environ.get("JWT_SECRET"): return os.environ.get("JWT_SECRET") if JWT_SECRET_FILE.exists(): return JWT_SECRET_FILE.read_text().strip() secret = secrets.token_hex(32) JWT_SECRET_FILE.write_text(secret) JWT_SECRET_FILE.chmod(0o600) return secret JWT_SECRET = get_jwt_secret() ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD") if not ADMIN_PASSWORD: _pass_file = DATA_DIR / ".admin_password" DATA_DIR.mkdir(parents=True, exist_ok=True) if _pass_file.exists(): ADMIN_PASSWORD = _pass_file.read_text().strip() else: ADMIN_PASSWORD = "admin" _pass_file.write_text(ADMIN_PASSWORD) _pass_file.chmod(0o600) VALID_USERS = {"admin": ADMIN_PASSWORD} ALLOWED_ORIGINS = [ "http://localhost:3000", "http://localhost:8080", "http://127.0.0.1:3000", "http://127.0.0.1:8080", "https://captain.tzzrarchitect.me", "capacitor://localhost", "ionic://localhost" ] security = HTTPBearer(auto_error=False) # ============================================================================ # Pydantic Models # ============================================================================ class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): token: str expires_at: str class SessionInfo(BaseModel): session_id: str name: str created_at: str class CreateSessionRequest(BaseModel): name: str # ============================================================================ # Auth Helpers # ============================================================================ def create_token(username: str) -> tuple[str, datetime]: expires = datetime.utcnow() + timedelta(days=JWT_EXPIRY_DAYS) payload = { "sub": username, "exp": expires, "iat": datetime.utcnow() } token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM) return token, expires def verify_token(token: str) -> Optional[str]: try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return payload.get("sub") except jwt.ExpiredSignatureError: return None except jwt.InvalidTokenError: return None async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: if not credentials: raise HTTPException(status_code=401, detail="Not authenticated") username = verify_token(credentials.credentials) if not username: raise HTTPException(status_code=401, detail="Invalid or expired token") return username # ============================================================================ # Claude Session Manager # ============================================================================ @dataclass class ClaudeSession: """Represents a Claude conversation session""" session_id: str # Claude's session UUID name: str # User-friendly name created_at: datetime subscribers: Set[asyncio.Queue] = field(default_factory=set) lock: asyncio.Lock = field(default_factory=asyncio.Lock) is_processing: bool = False class SessionManager: """ Manages Claude CLI sessions using -p mode with stream-json output. Each session maintains conversation history via Claude's session_id. """ def __init__(self): self._sessions: Dict[str, ClaudeSession] = {} self._lock = asyncio.Lock() async def create_session(self, name: str) -> ClaudeSession: """Create a new Claude session""" # Generate initial session by making a simple call session_id = secrets.token_hex(16) # We'll get real session_id from first call session = ClaudeSession( session_id=session_id, name=name, created_at=datetime.utcnow() ) async with self._lock: self._sessions[session_id] = session logger.info(f"Created session: {name} ({session_id})") return session async def get_session(self, session_id: str) -> Optional[ClaudeSession]: """Get a session by ID""" return self._sessions.get(session_id) async def list_sessions(self) -> list[SessionInfo]: """List all sessions""" return [ SessionInfo( session_id=s.session_id, name=s.name, created_at=s.created_at.isoformat() ) for s in self._sessions.values() ] async def subscribe(self, session_id: str) -> asyncio.Queue: """Subscribe to a session's output""" session = self._sessions.get(session_id) if not session: raise ValueError(f"Session {session_id} not found") queue: asyncio.Queue = asyncio.Queue(maxsize=100) async with session.lock: session.subscribers.add(queue) logger.debug(f"Subscribed to session {session_id}, {len(session.subscribers)} subscribers") return queue async def unsubscribe(self, session_id: str, queue: asyncio.Queue): """Unsubscribe from a session's output""" session = self._sessions.get(session_id) if session: async with session.lock: session.subscribers.discard(queue) logger.debug(f"Unsubscribed from session {session_id}") async def send_message(self, session_id: str, content: str) -> bool: """ Send a message to Claude and stream the response. Uses claude -p with stream-json for clean output. """ session = self._sessions.get(session_id) if not session: logger.error(f"Session {session_id} not found") return False if session.is_processing: logger.warning(f"Session {session_id} is already processing") await self._broadcast(session, {"type": "error", "content": "Ya hay un mensaje en proceso"}) return False session.is_processing = True try: # Build command cmd = [ CLAUDE_CMD, "-p", content, "--output-format", "stream-json", "--verbose", "--dangerously-skip-permissions" ] # If we have a Claude session UUID, resume it to maintain conversation # Claude UUIDs are in format: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx (36 chars with dashes) if session.session_id and '-' in session.session_id and len(session.session_id) == 36: cmd.extend(["--resume", session.session_id]) logger.debug(f"Resuming Claude session: {session.session_id}") logger.debug(f"Executing: {' '.join(cmd)}") # Run claude process process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=WORKING_DIR ) # Process output as it arrives full_response = "" tool_count = 0 try: while True: # Read line with timeout to avoid hanging try: line = await asyncio.wait_for( process.stdout.readline(), timeout=60.0 ) except asyncio.TimeoutError: logger.warning("Timeout reading from Claude process") break if not line: break line_str = line.decode('utf-8').strip() if not line_str: continue logger.debug(f"Claude output: {line_str[:100]}...") try: data = json.loads(line_str) msg_type = data.get("type", "") if msg_type == "assistant": message = data.get("message", {}) content_list = message.get("content", []) for item in content_list: if item.get("type") == "text": text = item.get("text", "") if text: full_response = text # Send immediately! await self._broadcast(session, { "type": "output", "content": text }) elif item.get("type") == "tool_use": tool_count += 1 await self._broadcast(session, { "type": "output", "content": f"procesando{'.' * tool_count}" }) elif msg_type == "result": result = data.get("result", "") claude_session = data.get("session_id") if claude_session: session.session_id = claude_session # Send done signal await self._broadcast(session, { "type": "done", "session_id": session.session_id }) except json.JSONDecodeError: pass except Exception as e: logger.error(f"Error processing line: {e}") finally: # Ensure process is cleaned up try: process.terminate() await asyncio.wait_for(process.wait(), timeout=5.0) except: process.kill() return True except Exception as e: logger.error(f"Error sending message: {e}") if str(e): # Only broadcast if there's an actual error message await self._broadcast(session, { "type": "error", "content": str(e) }) return False finally: session.is_processing = False async def _broadcast(self, session: ClaudeSession, message: dict): """Broadcast message to all subscribers""" async with session.lock: subscribers = list(session.subscribers) logger.debug(f"Broadcasting to {len(subscribers)} subscribers: {message}") for queue in subscribers: try: queue.put_nowait(message) logger.debug(f"Message queued successfully") except asyncio.QueueFull: try: queue.get_nowait() queue.put_nowait(message) except: pass except Exception as e: logger.error(f"Error queuing message: {e}") # Global session manager session_manager = SessionManager() # ============================================================================ # App Setup # ============================================================================ @asynccontextmanager async def lifespan(app: FastAPI): logger.info("Captain Claude v3 starting...") yield logger.info("Captain Claude v3 shutting down...") app = FastAPI( title="Captain Claude Mobile v3", description="Chat API using Claude CLI with stream-json", version="3.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================ # REST Endpoints # ============================================================================ @app.get("/health") async def health(): return {"status": "ok", "version": "3.0.0"} @app.post("/auth/login", response_model=LoginResponse) async def login(request: LoginRequest): if request.username not in VALID_USERS: raise HTTPException(status_code=401, detail="Invalid credentials") if VALID_USERS[request.username] != request.password: raise HTTPException(status_code=401, detail="Invalid credentials") token, expires = create_token(request.username) return LoginResponse(token=token, expires_at=expires.isoformat()) @app.get("/sessions") async def get_sessions(user: str = Depends(get_current_user)): return await session_manager.list_sessions() @app.post("/sessions") async def create_session(request: CreateSessionRequest, user: str = Depends(get_current_user)): try: session = await session_manager.create_session(request.name) return SessionInfo( session_id=session.session_id, name=session.name, created_at=session.created_at.isoformat() ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ============================================================================ # WebSocket Chat # ============================================================================ @app.websocket("/ws/chat") async def websocket_chat(websocket: WebSocket): """WebSocket endpoint for chat""" await websocket.accept() # Auth handshake try: auth_msg = await asyncio.wait_for(websocket.receive_json(), timeout=10.0) token = auth_msg.get("token", "") username = verify_token(token) if not username: await websocket.send_json({"type": "error", "message": "Invalid token"}) await websocket.close(code=4001) return except asyncio.TimeoutError: await websocket.send_json({"type": "error", "message": "Auth timeout"}) await websocket.close(code=4001) return except Exception as e: await websocket.send_json({"type": "error", "message": f"Auth error: {str(e)}"}) await websocket.close(code=4001) return # Send init sessions = await session_manager.list_sessions() await websocket.send_json({ "type": "init", "user": username, "sessions": [{"session_id": s.session_id, "name": s.name} for s in sessions] }) # Current subscription state current_session_id: Optional[str] = None current_queue: Optional[asyncio.Queue] = None output_task: Optional[asyncio.Task] = None async def output_forwarder(queue: asyncio.Queue): """Forward output from queue to websocket""" while True: try: message = await queue.get() # Message already has type, just forward it await websocket.send_json(message) except asyncio.CancelledError: break except Exception as e: logger.error(f"Output forwarder error: {e}") break try: while True: data = await websocket.receive_json() msg_type = data.get("type", "") if msg_type == "create_session": # Create a new session and auto-connect name = data.get("name", f"session_{datetime.now().strftime('%H%M%S')}") try: session = await session_manager.create_session(name) await websocket.send_json({ "type": "session_created", "session_id": session.session_id, "name": session.name }) # Auto-connect to the new session if current_session_id and current_queue: if output_task: output_task.cancel() try: await output_task except asyncio.CancelledError: pass await session_manager.unsubscribe(current_session_id, current_queue) current_queue = await session_manager.subscribe(session.session_id) current_session_id = session.session_id output_task = asyncio.create_task(output_forwarder(current_queue)) await websocket.send_json({ "type": "session_connected", "session_id": session.session_id, "name": session.name }) logger.info(f"Auto-connected to new session {session.session_id}") except Exception as e: await websocket.send_json({"type": "error", "message": str(e)}) elif msg_type == "connect_session": session_id = data.get("session_id", "") # Unsubscribe from previous session if current_session_id and current_queue: if output_task: output_task.cancel() try: await output_task except asyncio.CancelledError: pass await session_manager.unsubscribe(current_session_id, current_queue) # Subscribe to new session try: current_queue = await session_manager.subscribe(session_id) current_session_id = session_id output_task = asyncio.create_task(output_forwarder(current_queue)) session = await session_manager.get_session(session_id) await websocket.send_json({ "type": "session_connected", "session_id": session_id, "name": session.name if session else "unknown" }) except ValueError as e: await websocket.send_json({"type": "error", "message": str(e)}) current_session_id = None current_queue = None elif msg_type == "message": content = data.get("content", "") if current_session_id and content: # Send message in background so we can receive more commands asyncio.create_task( session_manager.send_message(current_session_id, content) ) elif not current_session_id: await websocket.send_json({ "type": "error", "message": "No hay sesión conectada" }) elif msg_type == "list_sessions": sessions = await session_manager.list_sessions() await websocket.send_json({ "type": "sessions_list", "sessions": [{"session_id": s.session_id, "name": s.name} for s in sessions] }) elif msg_type == "ping": await websocket.send_json({"type": "pong"}) except WebSocketDisconnect: pass except Exception as e: logger.error(f"WebSocket error: {e}") try: await websocket.send_json({"type": "error", "message": str(e)}) except: pass finally: if output_task: output_task.cancel() try: await output_task except asyncio.CancelledError: pass if current_session_id and current_queue: await session_manager.unsubscribe(current_session_id, current_queue) # ============================================================================ # Main # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=3030)