Initial commit - Captain Claude multiagent system

- Core captain_claude.py orchestrator
- Context manager with SQL schemas
- Provider adapters (Anthropic, OpenAI)
- Execution scripts
This commit is contained in:
ARCHITECT
2025-12-29 18:31:54 +00:00
commit d21bd9e650
17 changed files with 3295 additions and 0 deletions

392
captain_claude.py Executable file
View File

@@ -0,0 +1,392 @@
#!/usr/bin/env python3
"""
Captain Claude by dadiaar
Multi-Agent Orchestration System using claude-code-provider
A powerful multi-agent system that coordinates specialized agents
for complex software engineering tasks.
"""
import asyncio
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from claude_code_provider import (
ClaudeCodeClient,
ConcurrentOrchestrator,
SequentialOrchestrator,
CostTracker,
StructuredLogger,
)
# Load context file
CONTEXT_FILE = Path(__file__).parent / "CAPTAIN_CLAUDE.md"
GLOBAL_CONTEXT = CONTEXT_FILE.read_text() if CONTEXT_FILE.exists() else ""
# Agent Definitions
AGENTS = {
"captain": {
"name": "Captain",
"model": "sonnet",
"instructions": """You are Captain, the lead coordinator of the Captain Claude team.
Your role is to:
- Analyze incoming tasks and break them into subtasks
- Delegate work to specialized agents
- Synthesize results from multiple agents
- Ensure quality and coherence of final output
Be concise, strategic, and focused on delivering results.""",
},
"coder": {
"name": "Coder",
"model": "sonnet",
"instructions": """You are Coder, a senior software engineer.
Your role is to:
- Write clean, efficient, and well-documented code
- Implement features and fix bugs
- Follow best practices and design patterns
- Produce production-ready code
Focus on code quality and maintainability.""",
},
"reviewer": {
"name": "Reviewer",
"model": "sonnet",
"instructions": """You are Reviewer, a code review specialist.
Your role is to:
- Review code for bugs, security issues, and best practices
- Suggest improvements and optimizations
- Check for edge cases and error handling
- Ensure code quality standards are met
Be thorough but constructive in feedback.""",
},
"researcher": {
"name": "Researcher",
"model": "haiku",
"instructions": """You are Researcher, an information specialist.
Your role is to:
- Gather relevant information about technologies and patterns
- Find documentation and examples
- Research best practices and solutions
- Provide concise summaries of findings
Focus on accuracy and relevance.""",
},
"architect": {
"name": "Architect",
"model": "opus",
"instructions": """You are Architect, a system design expert.
Your role is to:
- Design system architectures and data models
- Plan implementation strategies
- Evaluate trade-offs and make technical decisions
- Create high-level designs and specifications
Think strategically about scalability and maintainability.""",
},
}
class CaptainClaude:
"""Multi-agent orchestration system for software engineering tasks."""
def __init__(self, output_dir: Optional[str] = None):
self.client = ClaudeCodeClient(
timeout=300.0,
enable_retries=True,
enable_circuit_breaker=True,
)
self.cost_tracker = CostTracker()
logger = logging.getLogger("captain-claude")
logger.setLevel(logging.INFO)
self.logger = StructuredLogger(logger)
self.output_dir = Path(output_dir) if output_dir else Path.cwd() / "captain_output"
self.output_dir.mkdir(exist_ok=True)
self.agents = {}
self._init_agents()
def _init_agents(self):
"""Initialize all specialized agents."""
for agent_id, config in AGENTS.items():
instructions = config["instructions"]
if agent_id == "captain" and GLOBAL_CONTEXT:
instructions = f"{GLOBAL_CONTEXT}\n\n{instructions}"
self.agents[agent_id] = self.client.create_agent(
name=config["name"],
model=config["model"],
instructions=instructions,
autocompact=True,
autocompact_threshold=80_000,
)
async def analyze_task(self, task: str) -> dict:
"""Have Captain analyze a task and create a plan."""
prompt = f"""Analyze this task and create an execution plan:
TASK: {task}
Available agents:
- Coder: Writes and implements code
- Reviewer: Reviews code for quality and issues
- Researcher: Gathers information and documentation
- Architect: Designs systems and makes technical decisions
Respond with a JSON object containing:
{{
"summary": "Brief task summary",
"agents_needed": ["list", "of", "agents"],
"steps": [
{{"agent": "agent_name", "task": "specific task description"}}
],
"parallel_possible": true/false
}}"""
response = await self.agents["captain"].run(prompt)
self.cost_tracker.record(response.usage, "sonnet")
# Try to extract JSON from response
try:
# Find JSON in response
text = response.text
start = text.find("{")
end = text.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(text[start:end])
except json.JSONDecodeError:
pass
# Fallback plan
return {
"summary": task[:100],
"agents_needed": ["coder"],
"steps": [{"agent": "coder", "task": task}],
"parallel_possible": False,
}
async def run_parallel(self, task: str, agents: list[str]) -> dict:
"""Run multiple agents in parallel on the same task."""
selected_agents = [self.agents[a] for a in agents if a in self.agents]
if not selected_agents:
return {"error": "No valid agents selected"}
orchestrator = ConcurrentOrchestrator(agents=selected_agents)
result = await orchestrator.run(task)
return {
"task": task,
"agents": agents,
"results": result.text if hasattr(result, "text") else str(result),
}
async def run_sequential(self, steps: list[dict]) -> list[dict]:
"""Run agents sequentially, passing output to next agent."""
results = []
previous_output = ""
for step in steps:
agent_id = step.get("agent", "coder")
task = step.get("task", "")
if agent_id not in self.agents:
results.append({"agent": agent_id, "error": "Agent not found"})
continue
# Include previous output if available
full_prompt = task
if previous_output:
full_prompt = f"""Previous agent output:
{previous_output}
Your task: {task}"""
response = await self.agents[agent_id].run(full_prompt)
model = AGENTS.get(agent_id, {}).get("model", "sonnet")
self.cost_tracker.record(response.usage, model)
previous_output = response.text
results.append({
"agent": agent_id,
"task": task,
"output": response.text,
})
return results
async def run_handoff(self, task: str, from_agent: str, to_agent: str) -> dict:
"""Run a handoff between two agents."""
if from_agent not in self.agents or to_agent not in self.agents:
return {"error": "Invalid agent(s)"}
# First agent processes
response1 = await self.agents[from_agent].run(task)
model1 = AGENTS.get(from_agent, {}).get("model", "sonnet")
self.cost_tracker.record(response1.usage, model1)
# Handoff to second agent
handoff_prompt = f"""Previous agent ({from_agent}) output:
{response1.text}
Continue and complete this work."""
response2 = await self.agents[to_agent].run(handoff_prompt)
model2 = AGENTS.get(to_agent, {}).get("model", "sonnet")
self.cost_tracker.record(response2.usage, model2)
return {
"task": task,
"from_agent": from_agent,
"to_agent": to_agent,
"first_output": response1.text,
"final_output": response2.text,
}
async def execute(self, task: str) -> dict:
"""Execute a task using intelligent agent orchestration."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
print(f"\n{'='*60}")
print("CAPTAIN CLAUDE by dadiaar")
print(f"{'='*60}")
print(f"Task: {task[:100]}...")
print(f"{'='*60}\n")
# Phase 1: Analyze task
print("[Captain] Analyzing task...")
plan = await self.analyze_task(task)
print(f"[Captain] Plan created: {len(plan.get('steps', []))} steps")
# Phase 2: Execute plan
results = []
if plan.get("parallel_possible") and len(plan.get("agents_needed", [])) > 1:
print("[Captain] Executing agents in parallel...")
parallel_result = await self.run_parallel(
task,
plan.get("agents_needed", ["coder"])
)
results.append(parallel_result)
else:
print("[Captain] Executing agents sequentially...")
sequential_results = await self.run_sequential(plan.get("steps", []))
results.extend(sequential_results)
# Phase 3: Synthesize results
print("[Captain] Synthesizing results...")
synthesis_prompt = f"""Synthesize these results into a coherent final output:
Original task: {task}
Agent outputs:
{json.dumps(results, indent=2, default=str)}
Provide a clear, actionable final result."""
final_response = await self.agents["captain"].run(synthesis_prompt)
self.cost_tracker.record(final_response.usage, "sonnet")
# Compile final result
final_result = {
"timestamp": timestamp,
"task": task,
"plan": plan,
"agent_results": results,
"final_output": final_response.text,
"cost_summary": self.cost_tracker.summary(),
}
# Save to file
output_file = self.output_dir / f"result_{timestamp}.json"
with open(output_file, "w") as f:
json.dump(final_result, f, indent=2, default=str)
print(f"\n{'='*60}")
print("EXECUTION COMPLETE")
print(f"{'='*60}")
print(f"Output saved: {output_file}")
print(f"Cost: {self.cost_tracker.summary()}")
print(f"{'='*60}\n")
return final_result
async def chat(self, message: str, agent: str = "captain") -> str:
"""Simple chat with a specific agent."""
if agent not in self.agents:
return f"Agent '{agent}' not found. Available: {list(self.agents.keys())}"
response = await self.agents[agent].run(message)
model = AGENTS.get(agent, {}).get("model", "sonnet")
self.cost_tracker.record(response.usage, model)
return response.text
async def main():
"""Interactive Captain Claude session."""
captain = CaptainClaude()
print("\n" + "="*60)
print("CAPTAIN CLAUDE by dadiaar")
print("Multi-Agent Orchestration System")
print("="*60)
print("\nCommands:")
print(" /execute <task> - Full multi-agent execution")
print(" /chat <message> - Chat with Captain")
print(" /agent <name> <message> - Chat with specific agent")
print(" /parallel <task> - Run all agents in parallel")
print(" /cost - Show cost summary")
print(" /quit - Exit")
print("="*60 + "\n")
while True:
try:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() == "/quit":
print("Goodbye!")
break
if user_input.lower() == "/cost":
print(f"Cost: {captain.cost_tracker.summary()}")
continue
if user_input.startswith("/execute "):
task = user_input[9:]
result = await captain.execute(task)
print(f"\nFinal Output:\n{result['final_output']}\n")
continue
if user_input.startswith("/parallel "):
task = user_input[10:]
agents = ["coder", "reviewer", "researcher"]
result = await captain.run_parallel(task, agents)
print(f"\nParallel Results:\n{result}\n")
continue
if user_input.startswith("/agent "):
parts = user_input[7:].split(" ", 1)
if len(parts) == 2:
agent, message = parts
response = await captain.chat(message, agent)
print(f"\n[{agent}]: {response}\n")
else:
print("Usage: /agent <name> <message>")
continue
if user_input.startswith("/chat ") or not user_input.startswith("/"):
message = user_input[6:] if user_input.startswith("/chat ") else user_input
response = await captain.chat(message)
print(f"\n[Captain]: {response}\n")
continue
print("Unknown command. Use /quit to exit.")
except KeyboardInterrupt:
print("\nGoodbye!")
break
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())