#!/usr/bin/env python3 """ Captain Claude Mobile API FastAPI backend for Captain Claude mobile app Provides REST + WebSocket endpoints for chat and terminal access """ import asyncio import json import os import pty import select import subprocess import uuid from datetime import datetime, timedelta from typing import Optional import asyncpg import jwt from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, UploadFile, File from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from contextlib import asynccontextmanager # Configuration JWT_SECRET = os.getenv("JWT_SECRET", "captain-claude-secret-key-change-in-production") JWT_ALGORITHM = "HS256" JWT_EXPIRATION_DAYS = 7 API_USER = os.getenv("API_USER", "captain") API_PASSWORD = os.getenv("API_PASSWORD", "tzzr2025") DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://captain:captain@localhost/captain_mobile") CLAUDE_CMD = os.getenv("CLAUDE_CMD", "/home/architect/.claude/local/claude") # Database pool db_pool: Optional[asyncpg.Pool] = None @asynccontextmanager async def lifespan(app: FastAPI): global db_pool try: db_pool = await asyncpg.create_pool(DATABASE_URL, min_size=2, max_size=10) await init_database() print("Database connected") except Exception as e: print(f"Database connection failed: {e}") db_pool = None yield if db_pool: await db_pool.close() app = FastAPI( title="Captain Claude Mobile API", version="1.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) security = HTTPBearer() # Models class LoginRequest(BaseModel): username: str password: str class LoginResponse(BaseModel): token: str expires_at: str class Message(BaseModel): role: str content: str timestamp: str class Conversation(BaseModel): id: str title: str created_at: str message_count: int class ScreenSession(BaseModel): name: str pid: str attached: bool # Database initialization async def init_database(): if not db_pool: return async with db_pool.acquire() as conn: await conn.execute(""" CREATE TABLE IF NOT EXISTS conversations ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id VARCHAR(255) NOT NULL, title VARCHAR(500), created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ) """) await conn.execute(""" CREATE TABLE IF NOT EXISTS messages ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), conversation_id UUID REFERENCES conversations(id) ON DELETE CASCADE, role VARCHAR(50) NOT NULL, content TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW() ) """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_messages_conversation ON messages(conversation_id) """) await conn.execute(""" CREATE INDEX IF NOT EXISTS idx_conversations_user ON conversations(user_id) """) # Auth helpers def create_token(username: str) -> tuple[str, datetime]: expires = datetime.utcnow() + timedelta(days=JWT_EXPIRATION_DAYS) token = jwt.encode( {"sub": username, "exp": expires}, JWT_SECRET, algorithm=JWT_ALGORITHM ) return token, expires def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str: try: payload = jwt.decode(credentials.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM]) return payload["sub"] except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token expired") except jwt.InvalidTokenError: raise HTTPException(status_code=401, detail="Invalid token") # REST Endpoints @app.get("/health") async def health(): return {"status": "ok", "service": "captain-api", "version": "1.0.0"} @app.post("/auth/login", response_model=LoginResponse) async def login(request: LoginRequest): if request.username == API_USER and request.password == API_PASSWORD: token, expires = create_token(request.username) return LoginResponse(token=token, expires_at=expires.isoformat()) raise HTTPException(status_code=401, detail="Invalid credentials") @app.get("/sessions") async def list_sessions(user: str = Depends(verify_token)) -> list[ScreenSession]: """List active screen sessions""" try: result = subprocess.run( ["screen", "-ls"], capture_output=True, text=True ) sessions = [] for line in result.stdout.split("\n"): if "\t" in line and ("Attached" in line or "Detached" in line): parts = line.strip().split("\t") if len(parts) >= 2: session_info = parts[0] pid_name = session_info.split(".") if len(pid_name) >= 2: sessions.append(ScreenSession( pid=pid_name[0], name=".".join(pid_name[1:]), attached="Attached" in line )) return sessions except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/history") async def get_history(user: str = Depends(verify_token), limit: int = 20) -> list[Conversation]: """Get conversation history""" if not db_pool: return [] async with db_pool.acquire() as conn: rows = await conn.fetch(""" SELECT c.id, c.title, c.created_at, COUNT(m.id) as message_count FROM conversations c LEFT JOIN messages m ON m.conversation_id = c.id WHERE c.user_id = $1 GROUP BY c.id ORDER BY c.updated_at DESC LIMIT $2 """, user, limit) return [ Conversation( id=str(row["id"]), title=row["title"] or "Untitled", created_at=row["created_at"].isoformat(), message_count=row["message_count"] ) for row in rows ] @app.get("/history/{conversation_id}") async def get_conversation(conversation_id: str, user: str = Depends(verify_token)) -> list[Message]: """Get messages for a conversation""" if not db_pool: return [] async with db_pool.acquire() as conn: rows = await conn.fetch(""" SELECT m.role, m.content, m.created_at FROM messages m JOIN conversations c ON c.id = m.conversation_id WHERE c.id = $1 AND c.user_id = $2 ORDER BY m.created_at ASC """, uuid.UUID(conversation_id), user) return [ Message( role=row["role"], content=row["content"], timestamp=row["created_at"].isoformat() ) for row in rows ] @app.post("/upload") async def upload_file(file: UploadFile = File(...), user: str = Depends(verify_token)): """Upload a file for context""" upload_dir = "/tmp/captain-uploads" os.makedirs(upload_dir, exist_ok=True) file_id = str(uuid.uuid4()) file_path = os.path.join(upload_dir, f"{file_id}_{file.filename}") with open(file_path, "wb") as f: content = await file.read() f.write(content) return {"file_id": file_id, "filename": file.filename, "path": file_path} # WebSocket Chat with Captain Claude @app.websocket("/ws/chat") async def websocket_chat(websocket: WebSocket): await websocket.accept() # First message should be auth token try: auth_data = await asyncio.wait_for(websocket.receive_json(), timeout=10) token = auth_data.get("token") if not token: await websocket.close(code=4001, reason="No token provided") return try: payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) user = payload["sub"] except: await websocket.close(code=4001, reason="Invalid token") return await websocket.send_json({"type": "connected", "user": user}) # Create conversation conversation_id = None if db_pool: async with db_pool.acquire() as conn: row = await conn.fetchrow( "INSERT INTO conversations (user_id, title) VALUES ($1, $2) RETURNING id", user, "New conversation" ) conversation_id = row["id"] # Main chat loop while True: data = await websocket.receive_json() if data.get("type") == "message": user_message = data.get("content", "") context_files = data.get("files", []) # Save user message if db_pool and conversation_id: async with db_pool.acquire() as conn: await conn.execute( "INSERT INTO messages (conversation_id, role, content) VALUES ($1, $2, $3)", conversation_id, "user", user_message ) # Build claude command cmd = [CLAUDE_CMD, "-p", user_message, "--output-format", "stream-json"] # Add file context if provided for file_path in context_files: if os.path.exists(file_path): cmd.extend(["--file", file_path]) # Stream response from claude try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd="/home/architect/captain-claude" ) full_response = "" async for line in process.stdout: line = line.decode().strip() if not line: continue try: event = json.loads(line) event_type = event.get("type") if event_type == "assistant": # Start of response await websocket.send_json({ "type": "start", "conversation_id": str(conversation_id) if conversation_id else None }) elif event_type == "content_block_delta": delta = event.get("delta", {}) if delta.get("type") == "text_delta": text = delta.get("text", "") full_response += text await websocket.send_json({ "type": "delta", "content": text }) elif event_type == "result": # End of response await websocket.send_json({ "type": "done", "content": full_response }) except json.JSONDecodeError: continue await process.wait() # Save assistant message if db_pool and conversation_id and full_response: async with db_pool.acquire() as conn: await conn.execute( "INSERT INTO messages (conversation_id, role, content) VALUES ($1, $2, $3)", conversation_id, "assistant", full_response ) # Update title from first response title = full_response[:100].split("\n")[0] await conn.execute( "UPDATE conversations SET title = $1, updated_at = NOW() WHERE id = $2", title, conversation_id ) except Exception as e: await websocket.send_json({ "type": "error", "message": str(e) }) elif data.get("type") == "ping": await websocket.send_json({"type": "pong"}) except WebSocketDisconnect: pass except asyncio.TimeoutError: await websocket.close(code=4002, reason="Auth timeout") except Exception as e: print(f"Chat WebSocket error: {e}") try: await websocket.close(code=4000, reason=str(e)) except: pass # WebSocket Terminal for screen sessions @app.websocket("/ws/terminal/{session_name}") async def websocket_terminal(websocket: WebSocket, session_name: str): await websocket.accept() # Auth try: auth_data = await asyncio.wait_for(websocket.receive_json(), timeout=10) token = auth_data.get("token") if not token: await websocket.close(code=4001, reason="No token provided") return try: jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM]) except: await websocket.close(code=4001, reason="Invalid token") return await websocket.send_json({"type": "connected", "session": session_name}) # Create PTY and attach to screen session master_fd, slave_fd = pty.openpty() process = subprocess.Popen( ["screen", "-x", session_name], stdin=slave_fd, stdout=slave_fd, stderr=slave_fd, preexec_fn=os.setsid ) os.close(slave_fd) # Set non-blocking import fcntl flags = fcntl.fcntl(master_fd, fcntl.F_GETFL) fcntl.fcntl(master_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) async def read_pty(): """Read from PTY and send to websocket""" while True: try: await asyncio.sleep(0.01) r, _, _ = select.select([master_fd], [], [], 0) if r: data = os.read(master_fd, 4096) if data: await websocket.send_json({ "type": "output", "data": data.decode("utf-8", errors="replace") }) except Exception as e: break async def write_pty(): """Read from websocket and write to PTY""" while True: try: data = await websocket.receive_json() if data.get("type") == "input": os.write(master_fd, data.get("data", "").encode()) elif data.get("type") == "resize": import struct import fcntl import termios winsize = struct.pack("HHHH", data.get("rows", 24), data.get("cols", 80), 0, 0 ) fcntl.ioctl(master_fd, termios.TIOCSWINSZ, winsize) except WebSocketDisconnect: break except Exception as e: break # Run both tasks read_task = asyncio.create_task(read_pty()) write_task = asyncio.create_task(write_pty()) try: await asyncio.gather(read_task, write_task) finally: read_task.cancel() write_task.cancel() os.close(master_fd) process.terminate() except asyncio.TimeoutError: await websocket.close(code=4002, reason="Auth timeout") except Exception as e: print(f"Terminal WebSocket error: {e}") try: await websocket.close(code=4000, reason=str(e)) except: pass if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=3030)