Initial commit: PRD v0.2, PROGRESS, knowledge base, PoC

This commit is contained in:
2026-04-11 17:49:30 +08:00
commit ed616a8de9
8 changed files with 8987 additions and 0 deletions

28
HEARTBEAT-CHECK.md Normal file
View File

@@ -0,0 +1,28 @@
# FlowPilot 开发心跳检查配置
> 供OpenClaw cron使用每30分钟检查一次项目进度
## 检查逻辑
1. 读 products/pmp-tool/PROGRESS.md
2. 找到"当前Issue"和"状态"
3. 如果状态是"进行中"
- 检查上次更新时间
- 如果超过30分钟未更新 → 检查Claude Code进程
- 进程已死 → 根据断点信息重新启动
- 进程还活着 → 不干预
4. 如果状态是"已完成"但下一个Issue未启动 → 自动启动下一个
5. 更新检查时间戳
## 恢复规则
- **代码未提交** → 先stash然后从断点重新执行
- **代码已提交但验证未通过** → 重新验证
- **验证通过但未更新PROGRESS** → 更新PROGRESS启动下一个Issue
- **连续3次失败** → 暂停,通知老板
## Cron配置建议
```
openclaw cron add --schedule "*/30 * * * *" --task "读 products/pmp-tool/PROGRESS.md检查项目进度。如果任务卡住超过30分钟根据断点信息恢复。如果一切正常回复 HEARTBEAT_OK。"
```

364
PRD.md Normal file
View File

@@ -0,0 +1,364 @@
# FlowPilot - AI驱动的项目管理流程引擎 PRD
> 创建时间2026-04-11
> 当前版本v0.2
> 状态:📋 规划中
## 1. 产品概述
**一句话:** AI Agent作为项目成员基于PMBOK第8版自动执行项目管理全流程。
**目标用户:** 3-20人的软件开发团队PM/技术负责人为主
**核心价值:** 用户创建项目后系统自动引导走完启动→规划→执行→监控→收尾全流程。AI Agent按需动态创建、分配任务、执行交付、跟踪进度关键节点与用户沟通决策。全过程记录可查。
**产品名:** FlowPilot流程领航
## 2. 核心创新双Agent管理架构
### 2.1 HR管理员AI Agents HR Manager
**职责:** 根据项目需求动态创建和管理执行任务的AI Agent
**能力:**
- 分析项目阶段和任务需求确定需要什么能力的Agent
- 选择合适的大模型(了解不同模型的能力边界)
- 编写和优化Agent的提示词Prompt Engineering
- 创建Agent实例并分配任务
- 监控Agent执行过程评分和质量把关
- 根据执行反馈迭代优化提示词和模型选择
- 终止或替换表现不佳的Agent
**工作流程:**
```
收到任务需求
判断需要什么能力(文档/分析/代码/协调)
选择模型 + 编写提示词
创建Agent → 分配任务(原子粒度)
监控执行 → 评分
├─ 通过 → 交付结果
└─ 不通过 → 优化提示词 → 重试或换模型
```
### 2.2 经验管理员AI Agents Experience Manager
**职责:** 项目知识管理 + 跨Agent协调 + 全局把控
**能力:**
- 记录所有Agent的输入输出形成项目知识库
- 总结和归档项目执行经验
- 为Agent补充项目上下文解决单个Agent信息孤岛问题
- 统筹跨Agent职能沟通和依赖协调
- 从项目全局监控进度,识别瓶颈
- 关键节点和遇到瓶颈时与用户沟通决策
- 根据项目状态动态调整Agent任务和方向
**工作流程:**
```
持续监听所有Agent的输入输出
记录并结构化存储
检测到:依赖缺失 → 补充上下文给相关Agent
进度偏差 → 触发调整通知HR管理员换Agent/改策略)
关键决策点 → 暂停,推送给用户决策
瓶颈/异常 → 升级给用户
定期总结归档 → 更新项目知识库
```
### 2.3 三方协作模型
```
┌─────────────┐
│ 用户(老板) │ ← 关键决策点
└──────┬──────┘
│ 指令/确认
┌──────▼──────┐
│ 经验管理员 │ ← 全局把控、知识管理
└──────┬──────┘
│ 任务分派/上下文补充
┌───────────┼───────────┐
▼ ▼ ▼
Agent-A Agent-B Agent-C ← 执行层(动态创建/销毁)
(文档) (分析) (协调)
▲ ▲ ▲
└───────────┼───────────┘
│ 创建/监控/评分/优化
┌──────┴──────┐
│ HR管理员 │ ← Agent生命周期管理
└─────────────┘
```
## 3. 任务拆解引擎
### 3.1 核心难题如何拆到单Agent单次可完成
**答案:递归分解 + 原子任务类型库**
### 3.2 原子任务的判定标准
一个任务可交给单Agent单次完成必须满足
| 条件 | 含义 |
|------|------|
| **输入确定** | 所需信息完整,不需要追问 |
| **输出确定** | 有明确格式和验收标准 |
| **无外部依赖** | 不需要等别的Agent结果或依赖已作为输入传入 |
| **时间可控** | 单次API调用在合理token内可完成 |
### 3.3 原子任务类型库
预设一套标准化的任务类型,每种类型有固定的输入/输出Schema
**文档生成类(单次 2000-4000 token**
- `fill_template` — 给定模板+数据 → 输出填充后文档
- `summarize` — 给定长文 → 输出摘要
- `format_convert` — 给定内容 → 按目标格式输出
**分析判断类(单次 1000-2000 token**
- `evaluate` — 给定标准+对象 → 输出评分+理由
- `risk_identify` — 给定范围 → 输出风险列表
- `prioritize` — 给定列表+标准 → 输出排序结果
**协调沟通类(单次 1000-1500 token**
- `generate_notification` — 给定事件 → 输出消息内容
- `aggregate_report` — 给定多条数据 → 输出汇总
**数据操作类(单次 1000-1500 token**
- `status_update` — 给定条件 → 查询并更新
- `extract` — 给定源 → 提取指定字段
### 3.4 拆解流程
```
用户创建项目
HR管理员分析项目 → 生成阶段任务清单
每个任务递归判断:是否满足原子任务标准?
├─ 满足 → 创建Agent执行传入完整输入
└─ 不满足 → 按原子任务类型库拆解为子任务列表
子任务再判断(递归),直到全部原子化
经验管理员记录拆解结果 → 生成任务依赖图
按依赖顺序:无依赖的并行,有依赖的串行
HR管理员为每个原子任务创建Agent执行
```
**原则:宁可拆细,不要贪大。** 一个任务失败只重试那一个,不连锁崩溃。
## 4. 功能列表
### P0 - 必须有MVPv0.2
| # | 功能 | 描述 | 状态 |
|---|------|------|------|
| P0-1 | 项目创建向导 | 引导用户完成项目章程(目标、范围、干系人、里程碑、资源),每步有说明和模板 | ⬜ |
| P0-2 | HR管理员 | 核心Agent管理器分析任务→选模型→写提示词→创建Agent→监控评分→迭代优化 | ⬜ |
| P0-3 | 经验管理员 | 核心协调器记录Agent输入输出→构建项目知识库→跨Agent协调→关键决策推送给用户 | ⬜ |
| P0-4 | 任务拆解引擎 | 递归分解+原子任务类型库把复杂任务拆到单Agent单次可完成的粒度 | ⬜ |
| P0-5 | 看板视图 | 待办/进行中/已完成/待确认 四列看板区分AI执行和人工任务 | ⬜ |
| P0-6 | 执行记录 | 全过程记录每个Agent的输入、输出、评分、耗时可查询可回溯 | ⬜ |
| P0-7 | 决策交互 | 关键节点推送决策给用户飞书消息卡片用户确认后AI继续执行 | ⬜ |
| P0-8 | 检查清单引擎 | 每个环节自动推送PMBOK检查清单AI执行+人工确认 | ⬜ |
### P1 - 应该有v1.0
| # | 功能 | 描述 | 状态 |
|---|------|------|------|
| P1-1 | 干系人管理 | 权力-利益矩阵可视化AI自动推荐参与策略 | ⬜ |
| P1-2 | WBS任务拆解 | 树形结构可视化AI辅助拆解+人工调整 | ⬜ |
| P1-3 | 风险管理 | 风险登记册AI自动识别风险+概率影响评估+应对策略建议 | ⬜ |
| P1-4 | 需求池管理 | MoSCoW优先级用户故事模板AI辅助需求分析和冲突检测 | ⬜ |
| P1-5 | 变更管理 | 变更请求→AI评估影响→用户审批→自动执行全流程记录 | ⬜ |
| P1-6 | 项目健康度报告 | AI自动汇总生成周报红黄绿灯+趋势分析 | ⬜ |
| P1-7 | 复盘与知识沉淀 | 项目收尾AI自动复盘经验归入知识库反哺未来项目 | ⬜ |
| P1-8 | 多模型支持 | HR管理员可根据任务类型选择不同模型GPT/Claude/GLM/Qwen等 | ⬜ |
### P2 - 可以有v2.0+
| # | 功能 | 描述 | 状态 |
|---|------|------|------|
| P2-1 | RACI矩阵 | 任务-角色矩阵可视化,人+AI混合角色 | ⬜ |
| P2-2 | 里程碑时间线 | 甘特图/时间线视图AI预测延误风险 | ⬜ |
| P2-3 | 多项目组合 | 跨项目状态总览Agent资源调度 | ⬜ |
| P2-4 | 模板市场 | 不同行业的项目模板+Agent提示词模板 | ⬜ |
| P2-5 | 预算管理 | AI估算成本+跟踪实际+超支预警 | ⬜ |
| P2-6 | Agent能力市场 | 社区共享优化好的Agent提示词配置 | ⬜ |
| P2-7 | 挣值分析 | PV/EV/AC自动计算SPI/CPI图表 | ⬜ |
## 5. 技术方案
### 5.1 平台
**飞书应用Web App/H5**
飞书作为用户入口和消息通道FlowPilot后端独立部署。
### 5.2 技术栈
```
前端React 18 + TypeScript + Arco Design
后端Node.js + Hono
数据库PostgreSQL主库+ Redis缓存/队列/会话)
AI编排层自研轻量Agent框架
- Agent生命周期管理
- 任务拆解引擎
- 多模型路由OpenAI/Claude/GLM/Qwen API
- 提示词版本管理
消息通道:飞书开放平台 SDK
部署Docker Compose初期单机→ K8s规模化后
```
### 5.3 核心架构
```
┌──────────────────────────────────────────────────┐
│ 飞书客户端 │
│ (项目管理界面 / 消息卡片 / 通知) │
└──────────────────┬───────────────────────────────┘
│ 飞书开放平台
┌──────────────────▼───────────────────────────────┐
│ FlowPilot 后端 │
│ ┌────────────┐ ┌─────────────┐ ┌───────────┐ │
│ │ HR管理员 │ │ 经验管理员 │ │ 任务拆解 │ │
│ │ Agent │ │ Agent │ │ 引擎 │ │
│ └─────┬──────┘ └──────┬──────┘ └─────┬─────┘ │
│ │ │ │ │
│ ┌─────▼────────────────▼───────────────▼─────┐ │
│ │ Agent 编排层 │ │
│ │ - 生命周期管理 - 多模型路由 - 提示词管理 │ │
│ │ - 执行记录 - 依赖调度 - 失败重试 │ │
│ └──────────────────┬────────────────────────┘ │
│ │ │
│ ┌──────────────────▼────────────────────────┐ │
│ │ 执行Agent池动态创建/销毁) │ │
│ │ Agent-1(doc) Agent-2(analysis) ... │ │
│ └───────────────────────────────────────────┘ │
└──────────────────┬───────────────────────────────┘
┌──────────────────▼───────────────────────────────┐
│ PostgreSQL + Redis │
│ - 项目数据 - Agent记录 - 知识库 - 提示词版本 │
└──────────────────────────────────────────────────┘
```
### 5.4 数据模型
```
Project项目
├── Charter章程
├── Stakeholder[](干系人)
├── Milestone[](里程碑)
├── Phase[](阶段:启动/规划/执行/监控/收尾)
│ └── TaskNode[](任务树,递归拆解后的原子任务)
│ ├── agent_id执行Agent
│ ├── agent_config模型+提示词版本)
│ ├── input完整输入
│ ├── output执行输出
│ ├── scoreHR管理员评分
│ └── status待分配/执行中/已完成/待确认/失败)
├── Risk[](风险)
├── ChangeRequest[](变更请求)
├── KnowledgeEntry[](项目知识库)
├── HealthReport[](健康度报告)
└── ExecutionLog[](全量执行记录)
AgentConfigAgent配置库
├── agent_type原子任务类型
├── model大模型标识
├── prompt_version提示词版本
├── prompt_template提示词模板
├── avg_score历史平均评分
└── usage_count使用次数
DecisionLog决策记录
├── task_id关联任务
├── decision_typeAI自主/人工确认/人工决策)
├── context决策上下文
├── decision决策结果
├── decideruser / hr_manager / experience_manager
└── timestamp
```
## 6. 里程碑
| 版本 | 目标 | 预计完成 | 实际完成 | 状态 |
|------|------|---------|---------|------|
| v0.1 | PRD初稿 | 2026-04-13 | — | ✅ |
| v0.2 | PRD升级版AI Agent架构 | 2026-04-15 | — | 🔨 |
| v0.3 | 技术验证Agent编排原型 | 2026-05-15 | — | ⬜ |
| v0.5 | MVP内测P0功能3个种子项目 | 2026-06-30 | — | ⬜ |
| v1.0 | 正式版P0+P1 | 2026-08-31 | — | ⬜ |
| v2.0 | 高级功能P2+Agent市场 | 2026-Q4 | — | ⬜ |
## 7. 商业化策略
**核心卖点AI Agent团队不只是AI辅助**
**定价:**
- 免费版1个项目基础流程引导AI辅助文档生成
- 专业版¥199/月/团队完整Agent团队+知识库+全流程自动化
- 企业版¥499/月,多项目+自定义Agent+私有部署
**为什么能收费:** 用户买的是一个**AI项目团队**不是工具。相当于用1/10的人力成本获得一个标准化的项目管理团队。
**获客路径:**
1. 种子用户CIO协会人脉免费试用收集案例
2. 飞书应用市场("AI项目管理"是热门搜索词)
3. 内容营销AI Agent做项目管理的实战案例
4. 口碑:推荐奖励+Agent模板分享
## 8. 关键技术挑战与应对
| 挑战 | 难度 | 应对策略 |
|------|------|---------|
| 任务拆解粒度 | ⭐⭐⭐ | 原子任务类型库 + 递归分解,宁可拆细 |
| 多Agent上下文共享 | ⭐⭐⭐⭐ | 经验管理员统一管理知识库Agent间不直接通信 |
| 提示词工程自动化 | ⭐⭐⭐⭐ | HR管理员评分驱动迭代积累优质提示词库 |
| 单次任务失败重试 | ⭐⭐ | 自动重试+换模型+人工兜底,三层保障 |
| AI执行成本控制 | ⭐⭐⭐ | 小模型做常规任务,大模型做复杂任务,分级路由 |
| 飞书平台审核 | ⭐⭐ | 研究审核规范,预留调整时间 |
## 9. 差异化定位
| 维度 | 传统PM工具 | AI辅助PM | **FlowPilot** |
|------|-----------|---------|-------------|
| AI角色 | 无 | 写作助手/聊天机器人 | **项目团队成员** |
| 流程执行 | 人驱动 | 人驱动+AI建议 | **AI执行+人决策** |
| Agent管理 | 无 | 固定助手 | **HR管理员动态创建优化** |
| 知识积累 | 手动归档 | 手动归档 | **经验管理员自动沉淀** |
| 团队构成 | 纯人类 | 人类+工具 | **人类+AI Agents混合** |
## 10. 迭代日志
### 2026-04-11 - PRD v0.2 升级
- 引入双Agent管理架构HR管理员+经验管理员)
- 新增任务拆解引擎设计(递归分解+原子任务类型库)
- 功能列表按AI Agent能力重新组织
- 技术架构升级为多Agent编排层
- 新增数据模型AgentConfig、DecisionLog
- 商业化策略调整为"AI团队"而非"AI工具"
- 里程碑重新规划
### 2026-04-11 - PRD v0.1 初稿
- 基于项目管理执行手册生成PRD
- 确定飞书应用为MVP平台
- 定义P0/P1/P2功能优先级
## 11. 待讨论 / 风险
- [ ] 技术验证先做一个Agent编排的PoC验证可行性
- [ ] 多模型API成本测算一个完整项目跑下来大概多少API费用
- [ ] 飞书应用审核AI生成内容的审核要求
- [ ] 竞品动态Microsoft Copilot在项目管理领域的进展
- [ ] 用户信任如何让用户信任AI的执行结果评分机制是否足够
- [ ] 提示词安全如何防止Agent被注入攻击

53
PROGRESS.md Normal file
View File

@@ -0,0 +1,53 @@
# FlowPilot 开发进度追踪
> 本文件是项目连续性的核心保障。任何新会话启动时,先读本文件。
## 当前状态
- **当前Issue:** 准备创建Gitea仓库并导入第一批Issues
- **状态:** 📋 规划完成,待启动开发
- **上次更新:** 2026-04-11 17:30
- **PRD版本:** v0.2
## Issue流水线
| # | Issue | 描述 | 状态 | 断点备注 |
|---|-------|------|------|---------|
| 0 | 项目初始化 | 创建Gitea仓库、目录结构、基础配置 | 🔨 进行中 | 待创建仓库 |
| 1 | 项目创建向导页 | React+Arco Design向导框架 | ⬜ | |
| 2 | 章程模板填充 | 基于用户输入生成章程Markdown | ⬜ | |
| 3 | 飞书消息发送 | Node.js+Hono调用飞书Open API | ⬜ | |
| 4 | 看板基础组件 | 三列看板+拖拽(dnd-kit) | ⬜ | |
| 5 | 任务数据模型 | PostgreSQL建表+CRUD | ⬜ | |
| 6 | HR管理员原型 | 任务拆解逻辑 | ⬜ | |
| 7 | 经验管理员原型 | 执行记录存储和查询 | ⬜ | |
| 8 | 决策交互卡片 | 飞书消息卡片+回调 | ⬜ | |
| 9 | 检查清单引擎 | PMBOK检查清单展示+勾选 | ⬜ | |
| 10 | 执行记录API | REST接口查询Agent日志 | ⬜ | |
| 11 | 创建流程串联 | 向导→章程→任务→看板→通知全流程 | ⬜ | |
| 12 | 基础CI/CD | GitHub Actions lint+单测 | ⬜ | |
| 13 | 内测反馈表单 | 飞书表单收集用户反馈 | ⬜ | |
| 14 | 打包部署脚本 | Docker-compose一键启动 | ⬜ | |
## 断点续传信息
(每次任务中断时更新此区域,新会话从这里恢复)
```
当前任务: 项目初始化
Claude Code执行状态: 未启动
已交付文件: poc/agent_poc.py, poc/README.md, PRD.md
Git状态: poc目录已init未push
下一步: 创建Gitea仓库 → 导入Issues → 启动Issue #1
```
## 里程碑
| 里程碑 | 目标日期 | 状态 |
|--------|---------|------|
| 仓库建立+Issue导入 | 2026-04-12 | 🔨 |
| 前端框架+基础页面 | 2026-04-25 | ⬜ |
| 后端API+数据库 | 2026-05-10 | ⬜ |
| Agent编排核心 | 2026-05-25 | ⬜ |
| 全流程串联 | 2026-06-10 | ⬜ |
| 内测版发布 | 2026-06-30 | ⬜ |

View File

@@ -0,0 +1,63 @@
# FlowPilot 开发知识库
> 记录开发过程中的经验、有效的Prompt模式、遇到的坑。供后续任务复用。
## Prompt模式库
### 模式1React组件开发
```
在 src/components/ 目录下创建 [组件名] 组件。
技术栈React 18 + TypeScript + Arco Design
要求:
1. 使用函数式组件 + Hooks
2. Props接口明确定义
3. 包含基本样式
4. 导出组件
参考已有组件风格:[列出已有文件]
```
### 模式2后端API开发
```
在 src/routes/ 目录下创建 [路由名] 路由。
框架Hono (Node.js)
要求:
1. RESTful风格
2. 输入验证
3. 错误处理
4. TypeScript类型
数据库操作参考src/db/schema.ts
```
### 模式3飞书集成
```
实现飞书 [功能] 集成。
SDK@larksuiteoapi/node-sdk
参考文档:[链接]
要求:
1. 使用已有飞书客户端配置src/config/feishu.ts
2. 错误处理和重试
3. 日志记录
```
## 经验记录
### 2026-04-11 - PoC阶段
- **PoC验证通过**HR管理员+经验管理员+任务拆解的核心链路可行
- **Python PoC**用模拟执行验证了架构实际开发用Node.js
- **Claude Code调用**:使用 `--print --permission-mode bypassPermissions` 模式
- **Codex不可用**本机未安装codex CLI改用claude code
## 技术决策记录
| 决策 | 原因 | 日期 |
|------|------|------|
| Node.js后端 | 与飞书SDK生态一致团队技能匹配 | 2026-04-11 |
| Arco Design | 字节出品与飞书UI风格统一 | 2026-04-11 |
| PostgreSQL | 关系型数据适合项目管理结构化数据 | 2026-04-11 |
| 先串行后并行 | Agent编排初期用串行降低复杂度 | 2026-04-11 |
## 遇到的坑
(开发过程中持续更新)

4
poc/.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
node_modules/
dist/
.env
*.log

66
poc/README.md Normal file
View File

@@ -0,0 +1,66 @@
# FlowPilot Agent Orchestration PoC
This proof-of-concept demonstrates the core agent orchestration ideas from the FlowPilot PRD:
## Core Concepts Demonstrated
1. **HR Manager Agent**: Decomposes high-level tasks into atomic tasks and creates executor agents
2. **Experience Manager Agent**: Logs executions, provides context, and maintains a knowledge base
3. **Atomic Task Decomposition**: Breaking down complex tasks into single-agent-completable units
4. **Agent Lifecycle Management**: Dynamic creation of executor agents for specific task types
## How to Run
```bash
python3 agent_poc.py
```
## Output
The PoC processes three sample high-level tasks from the project management handbook:
- Creating a project charter
- Identifying software development risks
- Creating a project schedule
For each task, it shows:
- How the HR Manager decomposes the task into atomic tasks
- How executor agents are created for each atomic task type
- How the Experience Manager logs executions and provides context
- The final results and execution logs
## Next Steps for Development
1. **Replace simulation with actual LLM calls** - Integrate with OpenAI/Anthropic/etc APIs
2. **Add real task execution** - Instead of mock results, have agents actually perform tasks
3. **Implement persistence** - Store agent configurations, execution logs, and knowledge base in database
4. **Add feedback loops** - HR Manager improves based on execution scores
5. **Integrate with Feishu** - Build the actual frontend and Feishu bot interfaces
## Files
- `agent_poc.py`: Main PoC implementation
- `poc_results.json`: Detailed execution results from the last run
- `.gitignore`: Git ignore file
## Design Notes
This PoC focuses on demonstrating the orchestration logic rather than actual AI execution.
In a real implementation:
- The `_simulate_execution` method would call actual LLM APIs
- Agents would have specific prompt templates for their task types
- The Experience Manager would use vector embeddings for context retrieval
- Task decomposition would be more sophisticated (possibly using LLMs themselves)
## Relation to PRD
This PoC validates the core architectural concepts from PRD v0.2:
- Dual agent management (HR Manager + Experience Manager)
- Atomic task decomposition
- Agent lifecycle management
- Context sharing and knowledge base
It does not yet cover:
- Feishu integration
- Full PMBOK process coverage
- Production-level error handling and scaling
- Actual LLM provider integration

381
poc/agent_poc.py Normal file
View File

@@ -0,0 +1,381 @@
#!/usr/bin/env python3
"""
Agent Orchestration PoC for FlowPilot
Demonstrates:
- HR Manager creating agents for tasks
- Experience Manager logging and context sharing
- Task decomposition into atomic types
"""
import uuid
from datetime import datetime
from enum import Enum
from typing import Dict, List, Any, Optional
class TaskType(Enum):
DOCUMENT_GENERATION = "document_generation"
ANALYSIS = "analysis"
COORDINATION = "coordination"
DATA_OPERATION = "data_operation"
class AgentRole(Enum):
HR_MANAGER = "hr_manager"
EXPERIENCE_MANAGER = "experience_manager"
EXECUTOR = "executor"
class TaskStatus(Enum):
PENDING = "pending"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class AtomicTask:
"""Represents a task that can be completed by a single agent in one session."""
def __init__(self,
task_type: TaskType,
description: str,
input_data: Dict[str, Any],
expected_output_schema: Dict[str, Any]):
self.id = str(uuid.uuid4())
self.task_type = task_type
self.description = description
self.input_data = input_data
self.expected_output_schema = expected_output_schema
self.status = TaskStatus.PENDING
self.agent_id: Optional[str] = None
self.result: Optional[Dict[str, Any]] = None
self.created_at = datetime.now()
self.completed_at: Optional[datetime] = None
def to_dict(self):
return {
"id": self.id,
"task_type": self.task_type.value,
"description": self.description,
"input_data": self.input_data,
"expected_output_schema": self.expected_output_schema,
"status": self.status.value,
"agent_id": self.agent_id,
"result": self.result,
"created_at": self.created_at.isoformat(),
"completed_at": self.completed_at.isoformat() if self.completed_at else None
}
class Agent:
"""Base agent class."""
def __init__(self, agent_id: str, role: AgentRole, model: str = "default"):
self.agent_id = agent_id
self.role = role
self.model = model
self.capabilities: List[TaskType] = []
def execute(self, task: AtomicTask) -> Dict[str, Any]:
"""Execute a task and return result."""
raise NotImplementedError
def can_handle(self, task_type: TaskType) -> bool:
return task_type in self.capabilities
class HRManagerAgent(Agent):
"""HR Manager: creates and manages executor agents."""
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.HR_MANAGER)
# HR manager can handle task decomposition and agent creation
self.capabilities = [TaskType.DOCUMENT_GENERATION, TaskType.ANALYSIS]
self.available_models = {
TaskType.DOCUMENT_GENERATION: "claude-3-sonnet",
TaskType.ANALYSIS: "gpt-4-turbo",
TaskType.COORDINATION: "claude-3-haiku",
TaskType.DATA_OPERATION: "gpt-3.5-turbo"
}
def decompose_task(self, high_level_task: str) -> List[AtomicTask]:
"""Decompose a high-level task into atomic tasks.
In PoC, we'll use simple heuristics.
"""
# Simple decomposition logic for demonstration
if "章程" in high_level_task or "charter" in high_level_task.lower():
return [
AtomicTask(
task_type=TaskType.DOCUMENT_GENERATION,
description="Generate project charter based on user input",
input_data={"template": "project_charter", "user_input": high_level_task},
expected_output_schema={"sections": ["目标", "范围", "里程碑", "干系人"]}
),
AtomicTask(
task_type=TaskType.ANALYSIS,
description="Analyze stakeholders and create stakeholder register",
input_data={"charter_placeholder": "will be filled"},
expected_output_schema={"stakeholders": [{"name": "", "role": "", "power": 0, "interest": 0}]}
)
]
elif "风险" in high_level_task or "risk" in high_level_task.lower():
return [
AtomicTask(
task_type=TaskType.ANALYSIS,
description="Identify project risks",
input_data={"scope": high_level_task},
expected_output_schema={"risks": [{"description": "", "probability": 0, "impact": 0}]}
),
AtomicTask(
task_type=TaskType.DOCUMENT_GENERATION,
description="Generate risk mitigation plan",
input_data={"risks_placeholder": "will be filled"},
expected_output_schema={"mitigations": []}
)
]
else:
# Default: treat as documentation task
return [
AtomicTask(
task_type=TaskType.DOCUMENT_GENERATION,
description=f"Generate document for: {high_level_task}",
input_data={"request": high_level_task},
expected_output_schema={"content": ""}
)
]
def create_executor_agent(self, task_type: TaskType) -> Agent:
"""Create an executor agent for a specific task type."""
agent_id = str(uuid.uuid4())
# In reality, this would configure the agent with specific prompts and model
executor = Agent(agent_id, AgentRole.EXECUTOR,
model=self.available_models.get(task_type, "default"))
executor.capabilities = [task_type]
return executor
def execute(self, task: AtomicTask) -> Dict[str, Any]:
"""HR Manager handles task decomposition and agent creation."""
# For PoC, we simulate the decomposition
atomic_tasks = self.decompose_task(task.description)
task.agent_id = self.agent_id
task.status = TaskStatus.COMPLETED
task.result = {
"atomic_tasks": [at.to_dict() for at in atomic_tasks]
}
return task.result
class ExperienceManagerAgent(Agent):
"""Experience Manager: logs executions, provides context, manages knowledge."""
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.EXPERIENCE_MANAGER)
self.capabilities = [TaskType.DATA_OPERATION, TaskType.ANALYSIS]
self.execution_log: List[Dict] = []
self.knowledge_base: Dict[str, Any] = {}
def record_execution(self, agent_id: str, task: AtomicTask, result: Dict[str, Any]):
"""Record an agent's task execution."""
log_entry = {
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"task_id": task.id,
"task_type": task.task_type.value,
"input": task.input_data,
"output": result,
"status": task.status.value
}
self.execution_log.append(log_entry)
# Update knowledge base with patterns
self._update_knowledge_base(task, result)
def _update_knowledge_base(self, task: AtomicTask, result: Dict[str, Any]):
"""Simple knowledge base update: store successful patterns."""
if task.status == TaskStatus.COMPLETED:
key = f"{task.task_type.value}_{hash(task.description) % 1000}"
self.knowledge_base[key] = {
"task_description": task.description,
"successful_input_pattern": task.input_data,
"successful_output_pattern": result,
"count": self.knowledge_base.get(key, {}).get("count", 0) + 1
}
def get_context_for_task(self, task_type: TaskType, keywords: str = "") -> Dict[str, Any]:
"""Retrieve relevant context for a task."""
# Simple context retrieval: return recent similar executions
relevant_logs = [
log for log in self.execution_log[-20:] # last 20
if log["task_type"] == task_type.value
]
return {
"recent_executions": relevant_logs,
"knowledge_base_snapshot": dict(list(self.knowledge_base.items())[:5]) # first 5
}
def execute(self, task: AtomicTask) -> Dict[str, Any]:
"""Experience Manager handles data operations and context queries."""
# For PoC, we simulate context provision
context = self.get_context_for_task(task.task_type,
str(task.input_data))
task.agent_id = self.agent_id
task.status = TaskStatus.COMPLETED
task.result = {
"context": context,
"message": "Context provided for task execution"
}
return task.result
class TaskOrchestrator:
"""Orchestrates the workflow between HR Manager, Experience Manager, and executors."""
def __init__(self):
self.hr_manager = HRManagerAgent("hr-001")
self.experience_manager = ExperienceManagerAgent("exp-001")
self.executor_agents: Dict[str, Agent] = {}
self.tasks: Dict[str, AtomicTask] = {}
def process_high_level_task(self, description: str) -> Dict[str, Any]:
"""Process a high-level task through the orchestrator."""
print(f"\n=== Processing High-Level Task: {description} ===")
# Step 1: HR Manager decomposes the task
hr_task = AtomicTask(
task_type=TaskType.ANALYSIS, # HR uses analysis to decompose
description=description,
input_data={},
expected_output_schema={"atomic_tasks": []}
)
decomposition_result = self.hr_manager.execute(hr_task)
self.experience_manager.record_execution(self.hr_manager.agent_id, hr_task, decomposition_result)
atomic_tasks_data = decomposition_result.get("atomic_tasks", [])
atomic_tasks = []
# Reconstruct AtomicTask objects from decomposition result
for task_data in atomic_tasks_data:
task = AtomicTask(
task_type=TaskType(task_data["task_type"]),
description=task_data["description"],
input_data=task_data["input_data"],
expected_output_schema=task_data["expected_output_schema"]
)
task.id = task_data["id"]
atomic_tasks.append(task)
self.tasks[task.id] = task
print(f"HR Manager decomposed into {len(atomic_tasks)} atomic tasks")
# Step 2: For each atomic task, create/execute agent
results = []
for task in atomic_tasks:
print(f"\n--- Executing Task: {task.description} ---")
# Get context from Experience Manager
context_task = AtomicTask(
task_type=TaskType.DATA_OPERATION,
description="Get context for task execution",
input_data={"task_type": task.task_type.value, "keywords": task.description},
expected_output_schema={"context": {}}
)
context_result = self.experience_manager.execute(context_task)
self.experience_manager.record_execution(self.experience_manager.agent_id, context_task, context_result)
# Create executor agent for this task type
executor_id = str(uuid.uuid4())
executor = self.hr_manager.create_executor_agent(task.task_type)
executor.agent_id = executor_id # Override with generated ID
self.executor_agents[executor_id] = executor
# Simulate task execution (in PoC, we just generate a mock result)
# In reality, this would call the actual agent/LLM
mock_result = self._simulate_execution(executor, task, context_result["context"])
# Record execution
task.agent_id = executor_id
task.status = TaskStatus.COMPLETED
task.result = mock_result
task.completed_at = datetime.now()
self.experience_manager.record_execution(executor_id, task, mock_result)
results.append({
"task": task.to_dict(),
"executor_id": executor_id,
"result": mock_result
})
print(f"Task completed by executor {executor_id[:8]}")
return {
"high_level_task": description,
"hr_manager_task": hr_task.to_dict(),
"atomic_tasks": [t.to_dict() for t in atomic_tasks],
"execution_results": results,
"experience_manager_log": self.experience_manager.execution_log[-len(atomic_tasks)*2:] # recent logs
}
def _simulate_execution(self, executor: Agent, task: AtomicTask, context: Dict) -> Dict[str, Any]:
"""Simulate agent execution. In real system, this would call LLM."""
# Simple mock based on task type
if task.task_type == TaskType.DOCUMENT_GENERATION:
return {
"generated_document": f"# Sample Document\n\nThis is a generated document for: {task.description}\n\nSections: {list(task.expected_output_schema.keys())}",
"model_used": executor.model,
"tokens_used": 1500
}
elif task.task_type == TaskType.ANALYSIS:
return {
"analysis_result": f"Analysis of: {task.description}\n\nKey findings: \n- Item 1: ...\n- Item 2: ...\n- Recommendation: ...",
"model_used": executor.model,
"tokens_used": 800
}
else:
return {
"output": f"Executed {task.task_type.value} for: {task.description}",
"model_used": executor.model,
"tokens_used": 500
}
def main():
"""Run the PoC demonstration."""
print("FlowPilot Agent Orchestration PoC")
print("=" * 50)
orchestrator = TaskOrchestrator()
# Example high-level tasks from the project management handbook
tasks_to_process = [
"创建项目章程,目标是开发一个新的客户管理系统,范围包括需求、开发、测试和上线",
"识别软件开发项目的主要风险",
"制定项目进度计划"
]
all_results = []
for task_desc in tasks_to_process:
result = orchestrator.process_high_level_task(task_desc)
all_results.append(result)
print("\n" + "="*50)
# Summary
print("\n=== PoC Execution Summary ===")
print(f"High-level tasks processed: len {len(all_results)}")
total_atomic = sum(len(r["atomic_tasks"]) for r in all_results)
print(f"Total atomic tasks executed: {total_atomic}")
print(f"HR Manager agent ID: {orchestrator.hr_manager.agent_id}")
print(f"Experience Manager agent ID: {orchestrator.experience_manager.agent_id}")
print(f"Executor agents created: {len(orchestrator.executor_agents)}")
# Save results to file for inspection
import json
with open("poc_results.json", "w") as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
print("\nDetailed results saved to poc_results.json")
if __name__ == "__main__":
main()

8028
poc/poc_results.json Normal file

File diff suppressed because it is too large Load Diff