382 lines
15 KiB
Python
382 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Agent Orchestration PoC for FlowPilot
|
|
Demonstrates:
|
|
- HR Manager creating agents for tasks
|
|
- Experience Manager logging and context sharing
|
|
- Task decomposition into atomic types
|
|
"""
|
|
|
|
import uuid
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
|
|
class TaskType(Enum):
|
|
DOCUMENT_GENERATION = "document_generation"
|
|
ANALYSIS = "analysis"
|
|
COORDINATION = "coordination"
|
|
DATA_OPERATION = "data_operation"
|
|
|
|
|
|
class AgentRole(Enum):
|
|
HR_MANAGER = "hr_manager"
|
|
EXPERIENCE_MANAGER = "experience_manager"
|
|
EXECUTOR = "executor"
|
|
|
|
|
|
class TaskStatus(Enum):
|
|
PENDING = "pending"
|
|
ASSIGNED = "assigned"
|
|
IN_PROGRESS = "in_progress"
|
|
COMPLETED = "completed"
|
|
FAILED = "failed"
|
|
|
|
|
|
class AtomicTask:
|
|
"""Represents a task that can be completed by a single agent in one session."""
|
|
|
|
def __init__(self,
|
|
task_type: TaskType,
|
|
description: str,
|
|
input_data: Dict[str, Any],
|
|
expected_output_schema: Dict[str, Any]):
|
|
self.id = str(uuid.uuid4())
|
|
self.task_type = task_type
|
|
self.description = description
|
|
self.input_data = input_data
|
|
self.expected_output_schema = expected_output_schema
|
|
self.status = TaskStatus.PENDING
|
|
self.agent_id: Optional[str] = None
|
|
self.result: Optional[Dict[str, Any]] = None
|
|
self.created_at = datetime.now()
|
|
self.completed_at: Optional[datetime] = None
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"id": self.id,
|
|
"task_type": self.task_type.value,
|
|
"description": self.description,
|
|
"input_data": self.input_data,
|
|
"expected_output_schema": self.expected_output_schema,
|
|
"status": self.status.value,
|
|
"agent_id": self.agent_id,
|
|
"result": self.result,
|
|
"created_at": self.created_at.isoformat(),
|
|
"completed_at": self.completed_at.isoformat() if self.completed_at else None
|
|
}
|
|
|
|
|
|
class Agent:
|
|
"""Base agent class."""
|
|
|
|
def __init__(self, agent_id: str, role: AgentRole, model: str = "default"):
|
|
self.agent_id = agent_id
|
|
self.role = role
|
|
self.model = model
|
|
self.capabilities: List[TaskType] = []
|
|
|
|
def execute(self, task: AtomicTask) -> Dict[str, Any]:
|
|
"""Execute a task and return result."""
|
|
raise NotImplementedError
|
|
|
|
def can_handle(self, task_type: TaskType) -> bool:
|
|
return task_type in self.capabilities
|
|
|
|
|
|
class HRManagerAgent(Agent):
|
|
"""HR Manager: creates and manages executor agents."""
|
|
|
|
def __init__(self, agent_id: str):
|
|
super().__init__(agent_id, AgentRole.HR_MANAGER)
|
|
# HR manager can handle task decomposition and agent creation
|
|
self.capabilities = [TaskType.DOCUMENT_GENERATION, TaskType.ANALYSIS]
|
|
self.available_models = {
|
|
TaskType.DOCUMENT_GENERATION: "claude-3-sonnet",
|
|
TaskType.ANALYSIS: "gpt-4-turbo",
|
|
TaskType.COORDINATION: "claude-3-haiku",
|
|
TaskType.DATA_OPERATION: "gpt-3.5-turbo"
|
|
}
|
|
|
|
def decompose_task(self, high_level_task: str) -> List[AtomicTask]:
|
|
"""Decompose a high-level task into atomic tasks.
|
|
In PoC, we'll use simple heuristics.
|
|
"""
|
|
# Simple decomposition logic for demonstration
|
|
if "章程" in high_level_task or "charter" in high_level_task.lower():
|
|
return [
|
|
AtomicTask(
|
|
task_type=TaskType.DOCUMENT_GENERATION,
|
|
description="Generate project charter based on user input",
|
|
input_data={"template": "project_charter", "user_input": high_level_task},
|
|
expected_output_schema={"sections": ["目标", "范围", "里程碑", "干系人"]}
|
|
),
|
|
AtomicTask(
|
|
task_type=TaskType.ANALYSIS,
|
|
description="Analyze stakeholders and create stakeholder register",
|
|
input_data={"charter_placeholder": "will be filled"},
|
|
expected_output_schema={"stakeholders": [{"name": "", "role": "", "power": 0, "interest": 0}]}
|
|
)
|
|
]
|
|
elif "风险" in high_level_task or "risk" in high_level_task.lower():
|
|
return [
|
|
AtomicTask(
|
|
task_type=TaskType.ANALYSIS,
|
|
description="Identify project risks",
|
|
input_data={"scope": high_level_task},
|
|
expected_output_schema={"risks": [{"description": "", "probability": 0, "impact": 0}]}
|
|
),
|
|
AtomicTask(
|
|
task_type=TaskType.DOCUMENT_GENERATION,
|
|
description="Generate risk mitigation plan",
|
|
input_data={"risks_placeholder": "will be filled"},
|
|
expected_output_schema={"mitigations": []}
|
|
)
|
|
]
|
|
else:
|
|
# Default: treat as documentation task
|
|
return [
|
|
AtomicTask(
|
|
task_type=TaskType.DOCUMENT_GENERATION,
|
|
description=f"Generate document for: {high_level_task}",
|
|
input_data={"request": high_level_task},
|
|
expected_output_schema={"content": ""}
|
|
)
|
|
]
|
|
|
|
def create_executor_agent(self, task_type: TaskType) -> Agent:
|
|
"""Create an executor agent for a specific task type."""
|
|
agent_id = str(uuid.uuid4())
|
|
# In reality, this would configure the agent with specific prompts and model
|
|
executor = Agent(agent_id, AgentRole.EXECUTOR,
|
|
model=self.available_models.get(task_type, "default"))
|
|
executor.capabilities = [task_type]
|
|
return executor
|
|
|
|
def execute(self, task: AtomicTask) -> Dict[str, Any]:
|
|
"""HR Manager handles task decomposition and agent creation."""
|
|
# For PoC, we simulate the decomposition
|
|
atomic_tasks = self.decompose_task(task.description)
|
|
task.agent_id = self.agent_id
|
|
task.status = TaskStatus.COMPLETED
|
|
task.result = {
|
|
"atomic_tasks": [at.to_dict() for at in atomic_tasks]
|
|
}
|
|
return task.result
|
|
|
|
|
|
class ExperienceManagerAgent(Agent):
|
|
"""Experience Manager: logs executions, provides context, manages knowledge."""
|
|
|
|
def __init__(self, agent_id: str):
|
|
super().__init__(agent_id, AgentRole.EXPERIENCE_MANAGER)
|
|
self.capabilities = [TaskType.DATA_OPERATION, TaskType.ANALYSIS]
|
|
self.execution_log: List[Dict] = []
|
|
self.knowledge_base: Dict[str, Any] = {}
|
|
|
|
def record_execution(self, agent_id: str, task: AtomicTask, result: Dict[str, Any]):
|
|
"""Record an agent's task execution."""
|
|
log_entry = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"agent_id": agent_id,
|
|
"task_id": task.id,
|
|
"task_type": task.task_type.value,
|
|
"input": task.input_data,
|
|
"output": result,
|
|
"status": task.status.value
|
|
}
|
|
self.execution_log.append(log_entry)
|
|
|
|
# Update knowledge base with patterns
|
|
self._update_knowledge_base(task, result)
|
|
|
|
def _update_knowledge_base(self, task: AtomicTask, result: Dict[str, Any]):
|
|
"""Simple knowledge base update: store successful patterns."""
|
|
if task.status == TaskStatus.COMPLETED:
|
|
key = f"{task.task_type.value}_{hash(task.description) % 1000}"
|
|
self.knowledge_base[key] = {
|
|
"task_description": task.description,
|
|
"successful_input_pattern": task.input_data,
|
|
"successful_output_pattern": result,
|
|
"count": self.knowledge_base.get(key, {}).get("count", 0) + 1
|
|
}
|
|
|
|
def get_context_for_task(self, task_type: TaskType, keywords: str = "") -> Dict[str, Any]:
|
|
"""Retrieve relevant context for a task."""
|
|
# Simple context retrieval: return recent similar executions
|
|
relevant_logs = [
|
|
log for log in self.execution_log[-20:] # last 20
|
|
if log["task_type"] == task_type.value
|
|
]
|
|
|
|
return {
|
|
"recent_executions": relevant_logs,
|
|
"knowledge_base_snapshot": dict(list(self.knowledge_base.items())[:5]) # first 5
|
|
}
|
|
|
|
def execute(self, task: AtomicTask) -> Dict[str, Any]:
|
|
"""Experience Manager handles data operations and context queries."""
|
|
# For PoC, we simulate context provision
|
|
context = self.get_context_for_task(task.task_type,
|
|
str(task.input_data))
|
|
task.agent_id = self.agent_id
|
|
task.status = TaskStatus.COMPLETED
|
|
task.result = {
|
|
"context": context,
|
|
"message": "Context provided for task execution"
|
|
}
|
|
return task.result
|
|
|
|
|
|
class TaskOrchestrator:
|
|
"""Orchestrates the workflow between HR Manager, Experience Manager, and executors."""
|
|
|
|
def __init__(self):
|
|
self.hr_manager = HRManagerAgent("hr-001")
|
|
self.experience_manager = ExperienceManagerAgent("exp-001")
|
|
self.executor_agents: Dict[str, Agent] = {}
|
|
self.tasks: Dict[str, AtomicTask] = {}
|
|
|
|
def process_high_level_task(self, description: str) -> Dict[str, Any]:
|
|
"""Process a high-level task through the orchestrator."""
|
|
print(f"\n=== Processing High-Level Task: {description} ===")
|
|
|
|
# Step 1: HR Manager decomposes the task
|
|
hr_task = AtomicTask(
|
|
task_type=TaskType.ANALYSIS, # HR uses analysis to decompose
|
|
description=description,
|
|
input_data={},
|
|
expected_output_schema={"atomic_tasks": []}
|
|
)
|
|
|
|
decomposition_result = self.hr_manager.execute(hr_task)
|
|
self.experience_manager.record_execution(self.hr_manager.agent_id, hr_task, decomposition_result)
|
|
|
|
atomic_tasks_data = decomposition_result.get("atomic_tasks", [])
|
|
atomic_tasks = []
|
|
|
|
# Reconstruct AtomicTask objects from decomposition result
|
|
for task_data in atomic_tasks_data:
|
|
task = AtomicTask(
|
|
task_type=TaskType(task_data["task_type"]),
|
|
description=task_data["description"],
|
|
input_data=task_data["input_data"],
|
|
expected_output_schema=task_data["expected_output_schema"]
|
|
)
|
|
task.id = task_data["id"]
|
|
atomic_tasks.append(task)
|
|
self.tasks[task.id] = task
|
|
|
|
print(f"HR Manager decomposed into {len(atomic_tasks)} atomic tasks")
|
|
|
|
# Step 2: For each atomic task, create/execute agent
|
|
results = []
|
|
for task in atomic_tasks:
|
|
print(f"\n--- Executing Task: {task.description} ---")
|
|
|
|
# Get context from Experience Manager
|
|
context_task = AtomicTask(
|
|
task_type=TaskType.DATA_OPERATION,
|
|
description="Get context for task execution",
|
|
input_data={"task_type": task.task_type.value, "keywords": task.description},
|
|
expected_output_schema={"context": {}}
|
|
)
|
|
context_result = self.experience_manager.execute(context_task)
|
|
self.experience_manager.record_execution(self.experience_manager.agent_id, context_task, context_result)
|
|
|
|
# Create executor agent for this task type
|
|
executor_id = str(uuid.uuid4())
|
|
executor = self.hr_manager.create_executor_agent(task.task_type)
|
|
executor.agent_id = executor_id # Override with generated ID
|
|
self.executor_agents[executor_id] = executor
|
|
|
|
# Simulate task execution (in PoC, we just generate a mock result)
|
|
# In reality, this would call the actual agent/LLM
|
|
mock_result = self._simulate_execution(executor, task, context_result["context"])
|
|
|
|
# Record execution
|
|
task.agent_id = executor_id
|
|
task.status = TaskStatus.COMPLETED
|
|
task.result = mock_result
|
|
task.completed_at = datetime.now()
|
|
|
|
self.experience_manager.record_execution(executor_id, task, mock_result)
|
|
results.append({
|
|
"task": task.to_dict(),
|
|
"executor_id": executor_id,
|
|
"result": mock_result
|
|
})
|
|
|
|
print(f"Task completed by executor {executor_id[:8]}")
|
|
|
|
return {
|
|
"high_level_task": description,
|
|
"hr_manager_task": hr_task.to_dict(),
|
|
"atomic_tasks": [t.to_dict() for t in atomic_tasks],
|
|
"execution_results": results,
|
|
"experience_manager_log": self.experience_manager.execution_log[-len(atomic_tasks)*2:] # recent logs
|
|
}
|
|
|
|
def _simulate_execution(self, executor: Agent, task: AtomicTask, context: Dict) -> Dict[str, Any]:
|
|
"""Simulate agent execution. In real system, this would call LLM."""
|
|
# Simple mock based on task type
|
|
if task.task_type == TaskType.DOCUMENT_GENERATION:
|
|
return {
|
|
"generated_document": f"# Sample Document\n\nThis is a generated document for: {task.description}\n\nSections: {list(task.expected_output_schema.keys())}",
|
|
"model_used": executor.model,
|
|
"tokens_used": 1500
|
|
}
|
|
elif task.task_type == TaskType.ANALYSIS:
|
|
return {
|
|
"analysis_result": f"Analysis of: {task.description}\n\nKey findings: \n- Item 1: ...\n- Item 2: ...\n- Recommendation: ...",
|
|
"model_used": executor.model,
|
|
"tokens_used": 800
|
|
}
|
|
else:
|
|
return {
|
|
"output": f"Executed {task.task_type.value} for: {task.description}",
|
|
"model_used": executor.model,
|
|
"tokens_used": 500
|
|
}
|
|
|
|
|
|
def main():
|
|
"""Run the PoC demonstration."""
|
|
print("FlowPilot Agent Orchestration PoC")
|
|
print("=" * 50)
|
|
|
|
orchestrator = TaskOrchestrator()
|
|
|
|
# Example high-level tasks from the project management handbook
|
|
tasks_to_process = [
|
|
"创建项目章程,目标是开发一个新的客户管理系统,范围包括需求、开发、测试和上线",
|
|
"识别软件开发项目的主要风险",
|
|
"制定项目进度计划"
|
|
]
|
|
|
|
all_results = []
|
|
for task_desc in tasks_to_process:
|
|
result = orchestrator.process_high_level_task(task_desc)
|
|
all_results.append(result)
|
|
print("\n" + "="*50)
|
|
|
|
# Summary
|
|
print("\n=== PoC Execution Summary ===")
|
|
print(f"High-level tasks processed: len {len(all_results)}")
|
|
total_atomic = sum(len(r["atomic_tasks"]) for r in all_results)
|
|
print(f"Total atomic tasks executed: {total_atomic}")
|
|
print(f"HR Manager agent ID: {orchestrator.hr_manager.agent_id}")
|
|
print(f"Experience Manager agent ID: {orchestrator.experience_manager.agent_id}")
|
|
print(f"Executor agents created: {len(orchestrator.executor_agents)}")
|
|
|
|
# Save results to file for inspection
|
|
import json
|
|
with open("poc_results.json", "w") as f:
|
|
json.dump(all_results, f, indent=2, ensure_ascii=False)
|
|
print("\nDetailed results saved to poc_results.json")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|