Files
pmp-tool/poc/agent_poc.py

382 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Agent Orchestration PoC for FlowPilot
Demonstrates:
- HR Manager creating agents for tasks
- Experience Manager logging and context sharing
- Task decomposition into atomic types
"""
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, List, Any, Optional
class TaskType(Enum):
DOCUMENT_GENERATION = "document_generation"
ANALYSIS = "analysis"
COORDINATION = "coordination"
DATA_OPERATION = "data_operation"
class AgentRole(Enum):
HR_MANAGER = "hr_manager"
EXPERIENCE_MANAGER = "experience_manager"
EXECUTOR = "executor"
class TaskStatus(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class AtomicTask:
"""Represents a task that can be completed by a single agent in one session."""
def __init__(self,
task_type: TaskType,
description: str,
input_data: Dict[str, Any],
expected_output_schema: Dict[str, Any]):
self.id = str(uuid.uuid4())
self.task_type = task_type
self.description = description
self.input_data = input_data
self.expected_output_schema = expected_output_schema
self.status = TaskStatus.PENDING
self.agent_id: Optional[str] = None
self.result: Optional[Dict[str, Any]] = None
self.created_at = datetime.now()
self.completed_at: Optional[datetime] = None
def to_dict(self):
return {
"id": self.id,
"task_type": self.task_type.value,
"description": self.description,
"input_data": self.input_data,
"expected_output_schema": self.expected_output_schema,
"status": self.status.value,
"agent_id": self.agent_id,
"result": self.result,
"created_at": self.created_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None
}
class Agent:
"""Base agent class."""
def __init__(self, agent_id: str, role: AgentRole, model: str = "default"):
self.agent_id = agent_id
self.role = role
self.model = model
self.capabilities: List[TaskType] = []
def execute(self, task: AtomicTask) -> Dict[str, Any]:
"""Execute a task and return result."""
raise NotImplementedError
def can_handle(self, task_type: TaskType) -> bool:
return task_type in self.capabilities
class HRManagerAgent(Agent):
"""HR Manager: creates and manages executor agents."""
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.HR_MANAGER)
# HR manager can handle task decomposition and agent creation
self.capabilities = [TaskType.DOCUMENT_GENERATION, TaskType.ANALYSIS]
self.available_models = {
TaskType.DOCUMENT_GENERATION: "claude-3-sonnet",
TaskType.ANALYSIS: "gpt-4-turbo",
TaskType.COORDINATION: "claude-3-haiku",
TaskType.DATA_OPERATION: "gpt-3.5-turbo"
}
def decompose_task(self, high_level_task: str) -> List[AtomicTask]:
"""Decompose a high-level task into atomic tasks.
In PoC, we'll use simple heuristics.
"""
# Simple decomposition logic for demonstration
if "章程" in high_level_task or "charter" in high_level_task.lower():
return [
AtomicTask(
task_type=TaskType.DOCUMENT_GENERATION,
description="Generate project charter based on user input",
input_data={"template": "project_charter", "user_input": high_level_task},
expected_output_schema={"sections": ["目标", "范围", "里程碑", "干系人"]}
),
AtomicTask(
task_type=TaskType.ANALYSIS,
description="Analyze stakeholders and create stakeholder register",
input_data={"charter_placeholder": "will be filled"},
expected_output_schema={"stakeholders": [{"name": "", "role": "", "power": 0, "interest": 0}]}
)
]
elif "风险" in high_level_task or "risk" in high_level_task.lower():
return [
AtomicTask(
task_type=TaskType.ANALYSIS,
description="Identify project risks",
input_data={"scope": high_level_task},
expected_output_schema={"risks": [{"description": "", "probability": 0, "impact": 0}]}
),
AtomicTask(
task_type=TaskType.DOCUMENT_GENERATION,
description="Generate risk mitigation plan",
input_data={"risks_placeholder": "will be filled"},
expected_output_schema={"mitigations": []}
)
]
else:
# Default: treat as documentation task
return [
AtomicTask(
task_type=TaskType.DOCUMENT_GENERATION,
description=f"Generate document for: {high_level_task}",
input_data={"request": high_level_task},
expected_output_schema={"content": ""}
)
]
def create_executor_agent(self, task_type: TaskType) -> Agent:
"""Create an executor agent for a specific task type."""
agent_id = str(uuid.uuid4())
# In reality, this would configure the agent with specific prompts and model
executor = Agent(agent_id, AgentRole.EXECUTOR,
model=self.available_models.get(task_type, "default"))
executor.capabilities = [task_type]
return executor
def execute(self, task: AtomicTask) -> Dict[str, Any]:
"""HR Manager handles task decomposition and agent creation."""
# For PoC, we simulate the decomposition
atomic_tasks = self.decompose_task(task.description)
task.agent_id = self.agent_id
task.status = TaskStatus.COMPLETED
task.result = {
"atomic_tasks": [at.to_dict() for at in atomic_tasks]
}
return task.result
class ExperienceManagerAgent(Agent):
"""Experience Manager: logs executions, provides context, manages knowledge."""
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.EXPERIENCE_MANAGER)
self.capabilities = [TaskType.DATA_OPERATION, TaskType.ANALYSIS]
self.execution_log: List[Dict] = []
self.knowledge_base: Dict[str, Any] = {}
def record_execution(self, agent_id: str, task: AtomicTask, result: Dict[str, Any]):
"""Record an agent's task execution."""
log_entry = {
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"task_id": task.id,
"task_type": task.task_type.value,
"input": task.input_data,
"output": result,
"status": task.status.value
}
self.execution_log.append(log_entry)
# Update knowledge base with patterns
self._update_knowledge_base(task, result)
def _update_knowledge_base(self, task: AtomicTask, result: Dict[str, Any]):
"""Simple knowledge base update: store successful patterns."""
if task.status == TaskStatus.COMPLETED:
key = f"{task.task_type.value}_{hash(task.description) % 1000}"
self.knowledge_base[key] = {
"task_description": task.description,
"successful_input_pattern": task.input_data,
"successful_output_pattern": result,
"count": self.knowledge_base.get(key, {}).get("count", 0) + 1
}
def get_context_for_task(self, task_type: TaskType, keywords: str = "") -> Dict[str, Any]:
"""Retrieve relevant context for a task."""
# Simple context retrieval: return recent similar executions
relevant_logs = [
log for log in self.execution_log[-20:] # last 20
if log["task_type"] == task_type.value
]
return {
"recent_executions": relevant_logs,
"knowledge_base_snapshot": dict(list(self.knowledge_base.items())[:5]) # first 5
}
def execute(self, task: AtomicTask) -> Dict[str, Any]:
"""Experience Manager handles data operations and context queries."""
# For PoC, we simulate context provision
context = self.get_context_for_task(task.task_type,
str(task.input_data))
task.agent_id = self.agent_id
task.status = TaskStatus.COMPLETED
task.result = {
"context": context,
"message": "Context provided for task execution"
}
return task.result
class TaskOrchestrator:
"""Orchestrates the workflow between HR Manager, Experience Manager, and executors."""
def __init__(self):
self.hr_manager = HRManagerAgent("hr-001")
self.experience_manager = ExperienceManagerAgent("exp-001")
self.executor_agents: Dict[str, Agent] = {}
self.tasks: Dict[str, AtomicTask] = {}
def process_high_level_task(self, description: str) -> Dict[str, Any]:
"""Process a high-level task through the orchestrator."""
print(f"\n=== Processing High-Level Task: {description} ===")
# Step 1: HR Manager decomposes the task
hr_task = AtomicTask(
task_type=TaskType.ANALYSIS, # HR uses analysis to decompose
description=description,
input_data={},
expected_output_schema={"atomic_tasks": []}
)
decomposition_result = self.hr_manager.execute(hr_task)
self.experience_manager.record_execution(self.hr_manager.agent_id, hr_task, decomposition_result)
atomic_tasks_data = decomposition_result.get("atomic_tasks", [])
atomic_tasks = []
# Reconstruct AtomicTask objects from decomposition result
for task_data in atomic_tasks_data:
task = AtomicTask(
task_type=TaskType(task_data["task_type"]),
description=task_data["description"],
input_data=task_data["input_data"],
expected_output_schema=task_data["expected_output_schema"]
)
task.id = task_data["id"]
atomic_tasks.append(task)
self.tasks[task.id] = task
print(f"HR Manager decomposed into {len(atomic_tasks)} atomic tasks")
# Step 2: For each atomic task, create/execute agent
results = []
for task in atomic_tasks:
print(f"\n--- Executing Task: {task.description} ---")
# Get context from Experience Manager
context_task = AtomicTask(
task_type=TaskType.DATA_OPERATION,
description="Get context for task execution",
input_data={"task_type": task.task_type.value, "keywords": task.description},
expected_output_schema={"context": {}}
)
context_result = self.experience_manager.execute(context_task)
self.experience_manager.record_execution(self.experience_manager.agent_id, context_task, context_result)
# Create executor agent for this task type
executor_id = str(uuid.uuid4())
executor = self.hr_manager.create_executor_agent(task.task_type)
executor.agent_id = executor_id # Override with generated ID
self.executor_agents[executor_id] = executor
# Simulate task execution (in PoC, we just generate a mock result)
# In reality, this would call the actual agent/LLM
mock_result = self._simulate_execution(executor, task, context_result["context"])
# Record execution
task.agent_id = executor_id
task.status = TaskStatus.COMPLETED
task.result = mock_result
task.completed_at = datetime.now()
self.experience_manager.record_execution(executor_id, task, mock_result)
results.append({
"task": task.to_dict(),
"executor_id": executor_id,
"result": mock_result
})
print(f"Task completed by executor {executor_id[:8]}")
return {
"high_level_task": description,
"hr_manager_task": hr_task.to_dict(),
"atomic_tasks": [t.to_dict() for t in atomic_tasks],
"execution_results": results,
"experience_manager_log": self.experience_manager.execution_log[-len(atomic_tasks)*2:] # recent logs
}
def _simulate_execution(self, executor: Agent, task: AtomicTask, context: Dict) -> Dict[str, Any]:
"""Simulate agent execution. In real system, this would call LLM."""
# Simple mock based on task type
if task.task_type == TaskType.DOCUMENT_GENERATION:
return {
"generated_document": f"# Sample Document\n\nThis is a generated document for: {task.description}\n\nSections: {list(task.expected_output_schema.keys())}",
"model_used": executor.model,
"tokens_used": 1500
}
elif task.task_type == TaskType.ANALYSIS:
return {
"analysis_result": f"Analysis of: {task.description}\n\nKey findings: \n- Item 1: ...\n- Item 2: ...\n- Recommendation: ...",
"model_used": executor.model,
"tokens_used": 800
}
else:
return {
"output": f"Executed {task.task_type.value} for: {task.description}",
"model_used": executor.model,
"tokens_used": 500
}
def main():
"""Run the PoC demonstration."""
print("FlowPilot Agent Orchestration PoC")
print("=" * 50)
orchestrator = TaskOrchestrator()
# Example high-level tasks from the project management handbook
tasks_to_process = [
"创建项目章程,目标是开发一个新的客户管理系统,范围包括需求、开发、测试和上线",
"识别软件开发项目的主要风险",
"制定项目进度计划"
]
all_results = []
for task_desc in tasks_to_process:
result = orchestrator.process_high_level_task(task_desc)
all_results.append(result)
print("\n" + "="*50)
# Summary
print("\n=== PoC Execution Summary ===")
print(f"High-level tasks processed: len {len(all_results)}")
total_atomic = sum(len(r["atomic_tasks"]) for r in all_results)
print(f"Total atomic tasks executed: {total_atomic}")
print(f"HR Manager agent ID: {orchestrator.hr_manager.agent_id}")
print(f"Experience Manager agent ID: {orchestrator.experience_manager.agent_id}")
print(f"Executor agents created: {len(orchestrator.executor_agents)}")
# Save results to file for inspection
import json
with open("poc_results.json", "w") as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
print("\nDetailed results saved to poc_results.json")
if __name__ == "__main__":
main()