#!/usr/bin/env python3 """ Agent Orchestration PoC for FlowPilot Demonstrates: - HR Manager creating agents for tasks - Experience Manager logging and context sharing - Task decomposition into atomic types """ import uuid from datetime import datetime from enum import Enum from typing import Dict, List, Any, Optional class TaskType(Enum): DOCUMENT_GENERATION = "document_generation" ANALYSIS = "analysis" COORDINATION = "coordination" DATA_OPERATION = "data_operation" class AgentRole(Enum): HR_MANAGER = "hr_manager" EXPERIENCE_MANAGER = "experience_manager" EXECUTOR = "executor" class TaskStatus(Enum): PENDING = "pending" ASSIGNED = "assigned" IN_PROGRESS = "in_progress" COMPLETED = "completed" FAILED = "failed" class AtomicTask: """Represents a task that can be completed by a single agent in one session.""" def __init__(self, task_type: TaskType, description: str, input_data: Dict[str, Any], expected_output_schema: Dict[str, Any]): self.id = str(uuid.uuid4()) self.task_type = task_type self.description = description self.input_data = input_data self.expected_output_schema = expected_output_schema self.status = TaskStatus.PENDING self.agent_id: Optional[str] = None self.result: Optional[Dict[str, Any]] = None self.created_at = datetime.now() self.completed_at: Optional[datetime] = None def to_dict(self): return { "id": self.id, "task_type": self.task_type.value, "description": self.description, "input_data": self.input_data, "expected_output_schema": self.expected_output_schema, "status": self.status.value, "agent_id": self.agent_id, "result": self.result, "created_at": self.created_at.isoformat(), "completed_at": self.completed_at.isoformat() if self.completed_at else None } class Agent: """Base agent class.""" def __init__(self, agent_id: str, role: AgentRole, model: str = "default"): self.agent_id = agent_id self.role = role self.model = model self.capabilities: List[TaskType] = [] def execute(self, task: AtomicTask) -> Dict[str, Any]: """Execute a task and return result.""" raise NotImplementedError def can_handle(self, task_type: TaskType) -> bool: return task_type in self.capabilities class HRManagerAgent(Agent): """HR Manager: creates and manages executor agents.""" def __init__(self, agent_id: str): super().__init__(agent_id, AgentRole.HR_MANAGER) # HR manager can handle task decomposition and agent creation self.capabilities = [TaskType.DOCUMENT_GENERATION, TaskType.ANALYSIS] self.available_models = { TaskType.DOCUMENT_GENERATION: "claude-3-sonnet", TaskType.ANALYSIS: "gpt-4-turbo", TaskType.COORDINATION: "claude-3-haiku", TaskType.DATA_OPERATION: "gpt-3.5-turbo" } def decompose_task(self, high_level_task: str) -> List[AtomicTask]: """Decompose a high-level task into atomic tasks. In PoC, we'll use simple heuristics. """ # Simple decomposition logic for demonstration if "章程" in high_level_task or "charter" in high_level_task.lower(): return [ AtomicTask( task_type=TaskType.DOCUMENT_GENERATION, description="Generate project charter based on user input", input_data={"template": "project_charter", "user_input": high_level_task}, expected_output_schema={"sections": ["目标", "范围", "里程碑", "干系人"]} ), AtomicTask( task_type=TaskType.ANALYSIS, description="Analyze stakeholders and create stakeholder register", input_data={"charter_placeholder": "will be filled"}, expected_output_schema={"stakeholders": [{"name": "", "role": "", "power": 0, "interest": 0}]} ) ] elif "风险" in high_level_task or "risk" in high_level_task.lower(): return [ AtomicTask( task_type=TaskType.ANALYSIS, description="Identify project risks", input_data={"scope": high_level_task}, expected_output_schema={"risks": [{"description": "", "probability": 0, "impact": 0}]} ), AtomicTask( task_type=TaskType.DOCUMENT_GENERATION, description="Generate risk mitigation plan", input_data={"risks_placeholder": "will be filled"}, expected_output_schema={"mitigations": []} ) ] else: # Default: treat as documentation task return [ AtomicTask( task_type=TaskType.DOCUMENT_GENERATION, description=f"Generate document for: {high_level_task}", input_data={"request": high_level_task}, expected_output_schema={"content": ""} ) ] def create_executor_agent(self, task_type: TaskType) -> Agent: """Create an executor agent for a specific task type.""" agent_id = str(uuid.uuid4()) # In reality, this would configure the agent with specific prompts and model executor = Agent(agent_id, AgentRole.EXECUTOR, model=self.available_models.get(task_type, "default")) executor.capabilities = [task_type] return executor def execute(self, task: AtomicTask) -> Dict[str, Any]: """HR Manager handles task decomposition and agent creation.""" # For PoC, we simulate the decomposition atomic_tasks = self.decompose_task(task.description) task.agent_id = self.agent_id task.status = TaskStatus.COMPLETED task.result = { "atomic_tasks": [at.to_dict() for at in atomic_tasks] } return task.result class ExperienceManagerAgent(Agent): """Experience Manager: logs executions, provides context, manages knowledge.""" def __init__(self, agent_id: str): super().__init__(agent_id, AgentRole.EXPERIENCE_MANAGER) self.capabilities = [TaskType.DATA_OPERATION, TaskType.ANALYSIS] self.execution_log: List[Dict] = [] self.knowledge_base: Dict[str, Any] = {} def record_execution(self, agent_id: str, task: AtomicTask, result: Dict[str, Any]): """Record an agent's task execution.""" log_entry = { "timestamp": datetime.now().isoformat(), "agent_id": agent_id, "task_id": task.id, "task_type": task.task_type.value, "input": task.input_data, "output": result, "status": task.status.value } self.execution_log.append(log_entry) # Update knowledge base with patterns self._update_knowledge_base(task, result) def _update_knowledge_base(self, task: AtomicTask, result: Dict[str, Any]): """Simple knowledge base update: store successful patterns.""" if task.status == TaskStatus.COMPLETED: key = f"{task.task_type.value}_{hash(task.description) % 1000}" self.knowledge_base[key] = { "task_description": task.description, "successful_input_pattern": task.input_data, "successful_output_pattern": result, "count": self.knowledge_base.get(key, {}).get("count", 0) + 1 } def get_context_for_task(self, task_type: TaskType, keywords: str = "") -> Dict[str, Any]: """Retrieve relevant context for a task.""" # Simple context retrieval: return recent similar executions relevant_logs = [ log for log in self.execution_log[-20:] # last 20 if log["task_type"] == task_type.value ] return { "recent_executions": relevant_logs, "knowledge_base_snapshot": dict(list(self.knowledge_base.items())[:5]) # first 5 } def execute(self, task: AtomicTask) -> Dict[str, Any]: """Experience Manager handles data operations and context queries.""" # For PoC, we simulate context provision context = self.get_context_for_task(task.task_type, str(task.input_data)) task.agent_id = self.agent_id task.status = TaskStatus.COMPLETED task.result = { "context": context, "message": "Context provided for task execution" } return task.result class TaskOrchestrator: """Orchestrates the workflow between HR Manager, Experience Manager, and executors.""" def __init__(self): self.hr_manager = HRManagerAgent("hr-001") self.experience_manager = ExperienceManagerAgent("exp-001") self.executor_agents: Dict[str, Agent] = {} self.tasks: Dict[str, AtomicTask] = {} def process_high_level_task(self, description: str) -> Dict[str, Any]: """Process a high-level task through the orchestrator.""" print(f"\n=== Processing High-Level Task: {description} ===") # Step 1: HR Manager decomposes the task hr_task = AtomicTask( task_type=TaskType.ANALYSIS, # HR uses analysis to decompose description=description, input_data={}, expected_output_schema={"atomic_tasks": []} ) decomposition_result = self.hr_manager.execute(hr_task) self.experience_manager.record_execution(self.hr_manager.agent_id, hr_task, decomposition_result) atomic_tasks_data = decomposition_result.get("atomic_tasks", []) atomic_tasks = [] # Reconstruct AtomicTask objects from decomposition result for task_data in atomic_tasks_data: task = AtomicTask( task_type=TaskType(task_data["task_type"]), description=task_data["description"], input_data=task_data["input_data"], expected_output_schema=task_data["expected_output_schema"] ) task.id = task_data["id"] atomic_tasks.append(task) self.tasks[task.id] = task print(f"HR Manager decomposed into {len(atomic_tasks)} atomic tasks") # Step 2: For each atomic task, create/execute agent results = [] for task in atomic_tasks: print(f"\n--- Executing Task: {task.description} ---") # Get context from Experience Manager context_task = AtomicTask( task_type=TaskType.DATA_OPERATION, description="Get context for task execution", input_data={"task_type": task.task_type.value, "keywords": task.description}, expected_output_schema={"context": {}} ) context_result = self.experience_manager.execute(context_task) self.experience_manager.record_execution(self.experience_manager.agent_id, context_task, context_result) # Create executor agent for this task type executor_id = str(uuid.uuid4()) executor = self.hr_manager.create_executor_agent(task.task_type) executor.agent_id = executor_id # Override with generated ID self.executor_agents[executor_id] = executor # Simulate task execution (in PoC, we just generate a mock result) # In reality, this would call the actual agent/LLM mock_result = self._simulate_execution(executor, task, context_result["context"]) # Record execution task.agent_id = executor_id task.status = TaskStatus.COMPLETED task.result = mock_result task.completed_at = datetime.now() self.experience_manager.record_execution(executor_id, task, mock_result) results.append({ "task": task.to_dict(), "executor_id": executor_id, "result": mock_result }) print(f"Task completed by executor {executor_id[:8]}") return { "high_level_task": description, "hr_manager_task": hr_task.to_dict(), "atomic_tasks": [t.to_dict() for t in atomic_tasks], "execution_results": results, "experience_manager_log": self.experience_manager.execution_log[-len(atomic_tasks)*2:] # recent logs } def _simulate_execution(self, executor: Agent, task: AtomicTask, context: Dict) -> Dict[str, Any]: """Simulate agent execution. In real system, this would call LLM.""" # Simple mock based on task type if task.task_type == TaskType.DOCUMENT_GENERATION: return { "generated_document": f"# Sample Document\n\nThis is a generated document for: {task.description}\n\nSections: {list(task.expected_output_schema.keys())}", "model_used": executor.model, "tokens_used": 1500 } elif task.task_type == TaskType.ANALYSIS: return { "analysis_result": f"Analysis of: {task.description}\n\nKey findings: \n- Item 1: ...\n- Item 2: ...\n- Recommendation: ...", "model_used": executor.model, "tokens_used": 800 } else: return { "output": f"Executed {task.task_type.value} for: {task.description}", "model_used": executor.model, "tokens_used": 500 } def main(): """Run the PoC demonstration.""" print("FlowPilot Agent Orchestration PoC") print("=" * 50) orchestrator = TaskOrchestrator() # Example high-level tasks from the project management handbook tasks_to_process = [ "创建项目章程,目标是开发一个新的客户管理系统,范围包括需求、开发、测试和上线", "识别软件开发项目的主要风险", "制定项目进度计划" ] all_results = [] for task_desc in tasks_to_process: result = orchestrator.process_high_level_task(task_desc) all_results.append(result) print("\n" + "="*50) # Summary print("\n=== PoC Execution Summary ===") print(f"High-level tasks processed: len {len(all_results)}") total_atomic = sum(len(r["atomic_tasks"]) for r in all_results) print(f"Total atomic tasks executed: {total_atomic}") print(f"HR Manager agent ID: {orchestrator.hr_manager.agent_id}") print(f"Experience Manager agent ID: {orchestrator.experience_manager.agent_id}") print(f"Executor agents created: {len(orchestrator.executor_agents)}") # Save results to file for inspection import json with open("poc_results.json", "w") as f: json.dump(all_results, f, indent=2, ensure_ascii=False) print("\nDetailed results saved to poc_results.json") if __name__ == "__main__": main()