- Core captain_claude.py orchestrator - Context manager with SQL schemas - Provider adapters (Anthropic, OpenAI) - Execution scripts
393 lines
13 KiB
Python
Executable File
393 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Captain Claude by dadiaar
|
|
Multi-Agent Orchestration System using claude-code-provider
|
|
|
|
A powerful multi-agent system that coordinates specialized agents
|
|
for complex software engineering tasks.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from claude_code_provider import (
|
|
ClaudeCodeClient,
|
|
ConcurrentOrchestrator,
|
|
SequentialOrchestrator,
|
|
CostTracker,
|
|
StructuredLogger,
|
|
)
|
|
|
|
# Load context file
|
|
CONTEXT_FILE = Path(__file__).parent / "CAPTAIN_CLAUDE.md"
|
|
GLOBAL_CONTEXT = CONTEXT_FILE.read_text() if CONTEXT_FILE.exists() else ""
|
|
|
|
# Agent Definitions
|
|
AGENTS = {
|
|
"captain": {
|
|
"name": "Captain",
|
|
"model": "sonnet",
|
|
"instructions": """You are Captain, the lead coordinator of the Captain Claude team.
|
|
Your role is to:
|
|
- Analyze incoming tasks and break them into subtasks
|
|
- Delegate work to specialized agents
|
|
- Synthesize results from multiple agents
|
|
- Ensure quality and coherence of final output
|
|
Be concise, strategic, and focused on delivering results.""",
|
|
},
|
|
"coder": {
|
|
"name": "Coder",
|
|
"model": "sonnet",
|
|
"instructions": """You are Coder, a senior software engineer.
|
|
Your role is to:
|
|
- Write clean, efficient, and well-documented code
|
|
- Implement features and fix bugs
|
|
- Follow best practices and design patterns
|
|
- Produce production-ready code
|
|
Focus on code quality and maintainability.""",
|
|
},
|
|
"reviewer": {
|
|
"name": "Reviewer",
|
|
"model": "sonnet",
|
|
"instructions": """You are Reviewer, a code review specialist.
|
|
Your role is to:
|
|
- Review code for bugs, security issues, and best practices
|
|
- Suggest improvements and optimizations
|
|
- Check for edge cases and error handling
|
|
- Ensure code quality standards are met
|
|
Be thorough but constructive in feedback.""",
|
|
},
|
|
"researcher": {
|
|
"name": "Researcher",
|
|
"model": "haiku",
|
|
"instructions": """You are Researcher, an information specialist.
|
|
Your role is to:
|
|
- Gather relevant information about technologies and patterns
|
|
- Find documentation and examples
|
|
- Research best practices and solutions
|
|
- Provide concise summaries of findings
|
|
Focus on accuracy and relevance.""",
|
|
},
|
|
"architect": {
|
|
"name": "Architect",
|
|
"model": "opus",
|
|
"instructions": """You are Architect, a system design expert.
|
|
Your role is to:
|
|
- Design system architectures and data models
|
|
- Plan implementation strategies
|
|
- Evaluate trade-offs and make technical decisions
|
|
- Create high-level designs and specifications
|
|
Think strategically about scalability and maintainability.""",
|
|
},
|
|
}
|
|
|
|
|
|
class CaptainClaude:
|
|
"""Multi-agent orchestration system for software engineering tasks."""
|
|
|
|
def __init__(self, output_dir: Optional[str] = None):
|
|
self.client = ClaudeCodeClient(
|
|
timeout=300.0,
|
|
enable_retries=True,
|
|
enable_circuit_breaker=True,
|
|
)
|
|
self.cost_tracker = CostTracker()
|
|
logger = logging.getLogger("captain-claude")
|
|
logger.setLevel(logging.INFO)
|
|
self.logger = StructuredLogger(logger)
|
|
self.output_dir = Path(output_dir) if output_dir else Path.cwd() / "captain_output"
|
|
self.output_dir.mkdir(exist_ok=True)
|
|
self.agents = {}
|
|
self._init_agents()
|
|
|
|
def _init_agents(self):
|
|
"""Initialize all specialized agents."""
|
|
for agent_id, config in AGENTS.items():
|
|
instructions = config["instructions"]
|
|
if agent_id == "captain" and GLOBAL_CONTEXT:
|
|
instructions = f"{GLOBAL_CONTEXT}\n\n{instructions}"
|
|
self.agents[agent_id] = self.client.create_agent(
|
|
name=config["name"],
|
|
model=config["model"],
|
|
instructions=instructions,
|
|
autocompact=True,
|
|
autocompact_threshold=80_000,
|
|
)
|
|
|
|
async def analyze_task(self, task: str) -> dict:
|
|
"""Have Captain analyze a task and create a plan."""
|
|
prompt = f"""Analyze this task and create an execution plan:
|
|
|
|
TASK: {task}
|
|
|
|
Available agents:
|
|
- Coder: Writes and implements code
|
|
- Reviewer: Reviews code for quality and issues
|
|
- Researcher: Gathers information and documentation
|
|
- Architect: Designs systems and makes technical decisions
|
|
|
|
Respond with a JSON object containing:
|
|
{{
|
|
"summary": "Brief task summary",
|
|
"agents_needed": ["list", "of", "agents"],
|
|
"steps": [
|
|
{{"agent": "agent_name", "task": "specific task description"}}
|
|
],
|
|
"parallel_possible": true/false
|
|
}}"""
|
|
|
|
response = await self.agents["captain"].run(prompt)
|
|
self.cost_tracker.record(response.usage, "sonnet")
|
|
|
|
# Try to extract JSON from response
|
|
try:
|
|
# Find JSON in response
|
|
text = response.text
|
|
start = text.find("{")
|
|
end = text.rfind("}") + 1
|
|
if start >= 0 and end > start:
|
|
return json.loads(text[start:end])
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback plan
|
|
return {
|
|
"summary": task[:100],
|
|
"agents_needed": ["coder"],
|
|
"steps": [{"agent": "coder", "task": task}],
|
|
"parallel_possible": False,
|
|
}
|
|
|
|
async def run_parallel(self, task: str, agents: list[str]) -> dict:
|
|
"""Run multiple agents in parallel on the same task."""
|
|
selected_agents = [self.agents[a] for a in agents if a in self.agents]
|
|
|
|
if not selected_agents:
|
|
return {"error": "No valid agents selected"}
|
|
|
|
orchestrator = ConcurrentOrchestrator(agents=selected_agents)
|
|
result = await orchestrator.run(task)
|
|
|
|
return {
|
|
"task": task,
|
|
"agents": agents,
|
|
"results": result.text if hasattr(result, "text") else str(result),
|
|
}
|
|
|
|
async def run_sequential(self, steps: list[dict]) -> list[dict]:
|
|
"""Run agents sequentially, passing output to next agent."""
|
|
results = []
|
|
previous_output = ""
|
|
|
|
for step in steps:
|
|
agent_id = step.get("agent", "coder")
|
|
task = step.get("task", "")
|
|
|
|
if agent_id not in self.agents:
|
|
results.append({"agent": agent_id, "error": "Agent not found"})
|
|
continue
|
|
|
|
# Include previous output if available
|
|
full_prompt = task
|
|
if previous_output:
|
|
full_prompt = f"""Previous agent output:
|
|
{previous_output}
|
|
|
|
Your task: {task}"""
|
|
|
|
response = await self.agents[agent_id].run(full_prompt)
|
|
model = AGENTS.get(agent_id, {}).get("model", "sonnet")
|
|
self.cost_tracker.record(response.usage, model)
|
|
|
|
previous_output = response.text
|
|
results.append({
|
|
"agent": agent_id,
|
|
"task": task,
|
|
"output": response.text,
|
|
})
|
|
|
|
return results
|
|
|
|
async def run_handoff(self, task: str, from_agent: str, to_agent: str) -> dict:
|
|
"""Run a handoff between two agents."""
|
|
if from_agent not in self.agents or to_agent not in self.agents:
|
|
return {"error": "Invalid agent(s)"}
|
|
|
|
# First agent processes
|
|
response1 = await self.agents[from_agent].run(task)
|
|
model1 = AGENTS.get(from_agent, {}).get("model", "sonnet")
|
|
self.cost_tracker.record(response1.usage, model1)
|
|
|
|
# Handoff to second agent
|
|
handoff_prompt = f"""Previous agent ({from_agent}) output:
|
|
{response1.text}
|
|
|
|
Continue and complete this work."""
|
|
|
|
response2 = await self.agents[to_agent].run(handoff_prompt)
|
|
model2 = AGENTS.get(to_agent, {}).get("model", "sonnet")
|
|
self.cost_tracker.record(response2.usage, model2)
|
|
|
|
return {
|
|
"task": task,
|
|
"from_agent": from_agent,
|
|
"to_agent": to_agent,
|
|
"first_output": response1.text,
|
|
"final_output": response2.text,
|
|
}
|
|
|
|
async def execute(self, task: str) -> dict:
|
|
"""Execute a task using intelligent agent orchestration."""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
|
|
print(f"\n{'='*60}")
|
|
print("CAPTAIN CLAUDE by dadiaar")
|
|
print(f"{'='*60}")
|
|
print(f"Task: {task[:100]}...")
|
|
print(f"{'='*60}\n")
|
|
|
|
# Phase 1: Analyze task
|
|
print("[Captain] Analyzing task...")
|
|
plan = await self.analyze_task(task)
|
|
print(f"[Captain] Plan created: {len(plan.get('steps', []))} steps")
|
|
|
|
# Phase 2: Execute plan
|
|
results = []
|
|
if plan.get("parallel_possible") and len(plan.get("agents_needed", [])) > 1:
|
|
print("[Captain] Executing agents in parallel...")
|
|
parallel_result = await self.run_parallel(
|
|
task,
|
|
plan.get("agents_needed", ["coder"])
|
|
)
|
|
results.append(parallel_result)
|
|
else:
|
|
print("[Captain] Executing agents sequentially...")
|
|
sequential_results = await self.run_sequential(plan.get("steps", []))
|
|
results.extend(sequential_results)
|
|
|
|
# Phase 3: Synthesize results
|
|
print("[Captain] Synthesizing results...")
|
|
synthesis_prompt = f"""Synthesize these results into a coherent final output:
|
|
|
|
Original task: {task}
|
|
|
|
Agent outputs:
|
|
{json.dumps(results, indent=2, default=str)}
|
|
|
|
Provide a clear, actionable final result."""
|
|
|
|
final_response = await self.agents["captain"].run(synthesis_prompt)
|
|
self.cost_tracker.record(final_response.usage, "sonnet")
|
|
|
|
# Compile final result
|
|
final_result = {
|
|
"timestamp": timestamp,
|
|
"task": task,
|
|
"plan": plan,
|
|
"agent_results": results,
|
|
"final_output": final_response.text,
|
|
"cost_summary": self.cost_tracker.summary(),
|
|
}
|
|
|
|
# Save to file
|
|
output_file = self.output_dir / f"result_{timestamp}.json"
|
|
with open(output_file, "w") as f:
|
|
json.dump(final_result, f, indent=2, default=str)
|
|
|
|
print(f"\n{'='*60}")
|
|
print("EXECUTION COMPLETE")
|
|
print(f"{'='*60}")
|
|
print(f"Output saved: {output_file}")
|
|
print(f"Cost: {self.cost_tracker.summary()}")
|
|
print(f"{'='*60}\n")
|
|
|
|
return final_result
|
|
|
|
async def chat(self, message: str, agent: str = "captain") -> str:
|
|
"""Simple chat with a specific agent."""
|
|
if agent not in self.agents:
|
|
return f"Agent '{agent}' not found. Available: {list(self.agents.keys())}"
|
|
|
|
response = await self.agents[agent].run(message)
|
|
model = AGENTS.get(agent, {}).get("model", "sonnet")
|
|
self.cost_tracker.record(response.usage, model)
|
|
return response.text
|
|
|
|
|
|
async def main():
|
|
"""Interactive Captain Claude session."""
|
|
captain = CaptainClaude()
|
|
|
|
print("\n" + "="*60)
|
|
print("CAPTAIN CLAUDE by dadiaar")
|
|
print("Multi-Agent Orchestration System")
|
|
print("="*60)
|
|
print("\nCommands:")
|
|
print(" /execute <task> - Full multi-agent execution")
|
|
print(" /chat <message> - Chat with Captain")
|
|
print(" /agent <name> <message> - Chat with specific agent")
|
|
print(" /parallel <task> - Run all agents in parallel")
|
|
print(" /cost - Show cost summary")
|
|
print(" /quit - Exit")
|
|
print("="*60 + "\n")
|
|
|
|
while True:
|
|
try:
|
|
user_input = input("You: ").strip()
|
|
|
|
if not user_input:
|
|
continue
|
|
|
|
if user_input.lower() == "/quit":
|
|
print("Goodbye!")
|
|
break
|
|
|
|
if user_input.lower() == "/cost":
|
|
print(f"Cost: {captain.cost_tracker.summary()}")
|
|
continue
|
|
|
|
if user_input.startswith("/execute "):
|
|
task = user_input[9:]
|
|
result = await captain.execute(task)
|
|
print(f"\nFinal Output:\n{result['final_output']}\n")
|
|
continue
|
|
|
|
if user_input.startswith("/parallel "):
|
|
task = user_input[10:]
|
|
agents = ["coder", "reviewer", "researcher"]
|
|
result = await captain.run_parallel(task, agents)
|
|
print(f"\nParallel Results:\n{result}\n")
|
|
continue
|
|
|
|
if user_input.startswith("/agent "):
|
|
parts = user_input[7:].split(" ", 1)
|
|
if len(parts) == 2:
|
|
agent, message = parts
|
|
response = await captain.chat(message, agent)
|
|
print(f"\n[{agent}]: {response}\n")
|
|
else:
|
|
print("Usage: /agent <name> <message>")
|
|
continue
|
|
|
|
if user_input.startswith("/chat ") or not user_input.startswith("/"):
|
|
message = user_input[6:] if user_input.startswith("/chat ") else user_input
|
|
response = await captain.chat(message)
|
|
print(f"\n[Captain]: {response}\n")
|
|
continue
|
|
|
|
print("Unknown command. Use /quit to exit.")
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nGoodbye!")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|