#!/usr/bin/env python3 """ Captain Claude by dadiaar Multi-Agent Orchestration System using claude-code-provider A powerful multi-agent system that coordinates specialized agents for complex software engineering tasks. """ import asyncio import json import logging from datetime import datetime from pathlib import Path from typing import Optional from claude_code_provider import ( ClaudeCodeClient, ConcurrentOrchestrator, SequentialOrchestrator, CostTracker, StructuredLogger, ) # Load context file CONTEXT_FILE = Path(__file__).parent / "CAPTAIN_CLAUDE.md" GLOBAL_CONTEXT = CONTEXT_FILE.read_text() if CONTEXT_FILE.exists() else "" # Agent Definitions AGENTS = { "captain": { "name": "Captain", "model": "sonnet", "instructions": """You are Captain, the lead coordinator of the Captain Claude team. Your role is to: - Analyze incoming tasks and break them into subtasks - Delegate work to specialized agents - Synthesize results from multiple agents - Ensure quality and coherence of final output Be concise, strategic, and focused on delivering results.""", }, "coder": { "name": "Coder", "model": "sonnet", "instructions": """You are Coder, a senior software engineer. Your role is to: - Write clean, efficient, and well-documented code - Implement features and fix bugs - Follow best practices and design patterns - Produce production-ready code Focus on code quality and maintainability.""", }, "reviewer": { "name": "Reviewer", "model": "sonnet", "instructions": """You are Reviewer, a code review specialist. Your role is to: - Review code for bugs, security issues, and best practices - Suggest improvements and optimizations - Check for edge cases and error handling - Ensure code quality standards are met Be thorough but constructive in feedback.""", }, "researcher": { "name": "Researcher", "model": "haiku", "instructions": """You are Researcher, an information specialist. Your role is to: - Gather relevant information about technologies and patterns - Find documentation and examples - Research best practices and solutions - Provide concise summaries of findings Focus on accuracy and relevance.""", }, "architect": { "name": "Architect", "model": "opus", "instructions": """You are Architect, a system design expert. Your role is to: - Design system architectures and data models - Plan implementation strategies - Evaluate trade-offs and make technical decisions - Create high-level designs and specifications Think strategically about scalability and maintainability.""", }, } class CaptainClaude: """Multi-agent orchestration system for software engineering tasks.""" def __init__(self, output_dir: Optional[str] = None): self.client = ClaudeCodeClient( timeout=300.0, enable_retries=True, enable_circuit_breaker=True, ) self.cost_tracker = CostTracker() logger = logging.getLogger("captain-claude") logger.setLevel(logging.INFO) self.logger = StructuredLogger(logger) self.output_dir = Path(output_dir) if output_dir else Path.cwd() / "captain_output" self.output_dir.mkdir(exist_ok=True) self.agents = {} self._init_agents() def _init_agents(self): """Initialize all specialized agents.""" for agent_id, config in AGENTS.items(): instructions = config["instructions"] if agent_id == "captain" and GLOBAL_CONTEXT: instructions = f"{GLOBAL_CONTEXT}\n\n{instructions}" self.agents[agent_id] = self.client.create_agent( name=config["name"], model=config["model"], instructions=instructions, autocompact=True, autocompact_threshold=80_000, ) async def analyze_task(self, task: str) -> dict: """Have Captain analyze a task and create a plan.""" prompt = f"""Analyze this task and create an execution plan: TASK: {task} Available agents: - Coder: Writes and implements code - Reviewer: Reviews code for quality and issues - Researcher: Gathers information and documentation - Architect: Designs systems and makes technical decisions Respond with a JSON object containing: {{ "summary": "Brief task summary", "agents_needed": ["list", "of", "agents"], "steps": [ {{"agent": "agent_name", "task": "specific task description"}} ], "parallel_possible": true/false }}""" response = await self.agents["captain"].run(prompt) self.cost_tracker.record(response.usage, "sonnet") # Try to extract JSON from response try: # Find JSON in response text = response.text start = text.find("{") end = text.rfind("}") + 1 if start >= 0 and end > start: return json.loads(text[start:end]) except json.JSONDecodeError: pass # Fallback plan return { "summary": task[:100], "agents_needed": ["coder"], "steps": [{"agent": "coder", "task": task}], "parallel_possible": False, } async def run_parallel(self, task: str, agents: list[str]) -> dict: """Run multiple agents in parallel on the same task.""" selected_agents = [self.agents[a] for a in agents if a in self.agents] if not selected_agents: return {"error": "No valid agents selected"} orchestrator = ConcurrentOrchestrator(agents=selected_agents) result = await orchestrator.run(task) return { "task": task, "agents": agents, "results": result.text if hasattr(result, "text") else str(result), } async def run_sequential(self, steps: list[dict]) -> list[dict]: """Run agents sequentially, passing output to next agent.""" results = [] previous_output = "" for step in steps: agent_id = step.get("agent", "coder") task = step.get("task", "") if agent_id not in self.agents: results.append({"agent": agent_id, "error": "Agent not found"}) continue # Include previous output if available full_prompt = task if previous_output: full_prompt = f"""Previous agent output: {previous_output} Your task: {task}""" response = await self.agents[agent_id].run(full_prompt) model = AGENTS.get(agent_id, {}).get("model", "sonnet") self.cost_tracker.record(response.usage, model) previous_output = response.text results.append({ "agent": agent_id, "task": task, "output": response.text, }) return results async def run_handoff(self, task: str, from_agent: str, to_agent: str) -> dict: """Run a handoff between two agents.""" if from_agent not in self.agents or to_agent not in self.agents: return {"error": "Invalid agent(s)"} # First agent processes response1 = await self.agents[from_agent].run(task) model1 = AGENTS.get(from_agent, {}).get("model", "sonnet") self.cost_tracker.record(response1.usage, model1) # Handoff to second agent handoff_prompt = f"""Previous agent ({from_agent}) output: {response1.text} Continue and complete this work.""" response2 = await self.agents[to_agent].run(handoff_prompt) model2 = AGENTS.get(to_agent, {}).get("model", "sonnet") self.cost_tracker.record(response2.usage, model2) return { "task": task, "from_agent": from_agent, "to_agent": to_agent, "first_output": response1.text, "final_output": response2.text, } async def execute(self, task: str) -> dict: """Execute a task using intelligent agent orchestration.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") print(f"\n{'='*60}") print("CAPTAIN CLAUDE by dadiaar") print(f"{'='*60}") print(f"Task: {task[:100]}...") print(f"{'='*60}\n") # Phase 1: Analyze task print("[Captain] Analyzing task...") plan = await self.analyze_task(task) print(f"[Captain] Plan created: {len(plan.get('steps', []))} steps") # Phase 2: Execute plan results = [] if plan.get("parallel_possible") and len(plan.get("agents_needed", [])) > 1: print("[Captain] Executing agents in parallel...") parallel_result = await self.run_parallel( task, plan.get("agents_needed", ["coder"]) ) results.append(parallel_result) else: print("[Captain] Executing agents sequentially...") sequential_results = await self.run_sequential(plan.get("steps", [])) results.extend(sequential_results) # Phase 3: Synthesize results print("[Captain] Synthesizing results...") synthesis_prompt = f"""Synthesize these results into a coherent final output: Original task: {task} Agent outputs: {json.dumps(results, indent=2, default=str)} Provide a clear, actionable final result.""" final_response = await self.agents["captain"].run(synthesis_prompt) self.cost_tracker.record(final_response.usage, "sonnet") # Compile final result final_result = { "timestamp": timestamp, "task": task, "plan": plan, "agent_results": results, "final_output": final_response.text, "cost_summary": self.cost_tracker.summary(), } # Save to file output_file = self.output_dir / f"result_{timestamp}.json" with open(output_file, "w") as f: json.dump(final_result, f, indent=2, default=str) print(f"\n{'='*60}") print("EXECUTION COMPLETE") print(f"{'='*60}") print(f"Output saved: {output_file}") print(f"Cost: {self.cost_tracker.summary()}") print(f"{'='*60}\n") return final_result async def chat(self, message: str, agent: str = "captain") -> str: """Simple chat with a specific agent.""" if agent not in self.agents: return f"Agent '{agent}' not found. Available: {list(self.agents.keys())}" response = await self.agents[agent].run(message) model = AGENTS.get(agent, {}).get("model", "sonnet") self.cost_tracker.record(response.usage, model) return response.text async def main(): """Interactive Captain Claude session.""" captain = CaptainClaude() print("\n" + "="*60) print("CAPTAIN CLAUDE by dadiaar") print("Multi-Agent Orchestration System") print("="*60) print("\nCommands:") print(" /execute - Full multi-agent execution") print(" /chat - Chat with Captain") print(" /agent - Chat with specific agent") print(" /parallel - Run all agents in parallel") print(" /cost - Show cost summary") print(" /quit - Exit") print("="*60 + "\n") while True: try: user_input = input("You: ").strip() if not user_input: continue if user_input.lower() == "/quit": print("Goodbye!") break if user_input.lower() == "/cost": print(f"Cost: {captain.cost_tracker.summary()}") continue if user_input.startswith("/execute "): task = user_input[9:] result = await captain.execute(task) print(f"\nFinal Output:\n{result['final_output']}\n") continue if user_input.startswith("/parallel "): task = user_input[10:] agents = ["coder", "reviewer", "researcher"] result = await captain.run_parallel(task, agents) print(f"\nParallel Results:\n{result}\n") continue if user_input.startswith("/agent "): parts = user_input[7:].split(" ", 1) if len(parts) == 2: agent, message = parts response = await captain.chat(message, agent) print(f"\n[{agent}]: {response}\n") else: print("Usage: /agent ") continue if user_input.startswith("/chat ") or not user_input.startswith("/"): message = user_input[6:] if user_input.startswith("/chat ") else user_input response = await captain.chat(message) print(f"\n[Captain]: {response}\n") continue print("Unknown command. Use /quit to exit.") except KeyboardInterrupt: print("\nGoodbye!") break except Exception as e: print(f"Error: {e}") if __name__ == "__main__": asyncio.run(main())