1. 需求分析与选型标准
1.1 八大研发阶段的编排需求
端到端研发自动化系统的核心挑战:
- 长周期流程:从需求到上线可能持续数天甚至数周,需要持久化状态管理
- 人机协同:关键节点(PRD 评审、架构评审、上线审批)需要人工介入
- 异步执行:AI Agent 任务(代码生成、测试执行)是长时间运行的异步操作
- 错误恢复:网络故障、API 限流、LLM 超时等异常需要自动重试和补偿
- 可观测性:需要追踪每个 Agent 的执行状态、耗时、Token 消耗
- 并发控制:多个项目并行研发,需要资源隔离和优先级调度
1.2 工作流引擎选型标准
| 评估维度 | 权重 | 具体要求 |
|---|---|---|
| 持久化能力 | 25% | 支持长时间运行(小时/天/周),故障后自动恢复状态 |
| 人机协同 | 20% | 支持人工审批节点、暂停/恢复、信号交互 |
| 错误处理 | 20% | 内置重试策略、超时控制、补偿事务、死信队列 |
| 可观测性 | 15% | 可视化 Dashboard、执行历史、指标监控、日志追踪 |
| 开发体验 | 10% | 代码即工作流、本地调试、版本管理、测试框架 |
| 扩展性 | 10% | 水平伸缩、多语言 SDK、插件生态、云原生部署 |
1.3 消息队列选型标准
| 评估维度 | 权重 | 具体要求 |
|---|---|---|
| 吞吐量 | 20% | 支持百万级 TPS,满足高并发 Agent 通信 |
| 延迟 | 20% | 毫秒级延迟,保证实时响应 |
| 可靠性 | 20% | 消息不丢失、Exactly-Once 语义、事务支持 |
| 模式支持 | 15% | P2P、Pub/Sub、Event Streaming、Request/Reply |
| 运维复杂度 | 15% | 部署简单、监控完善、故障自愈 |
| 生态集成 | 10% | K8S 友好、主流语言客户端、云服务商支持 |
2. 工作流引擎方案对比
2.1 主流方案总览
Temporal
分布式持久化工作流
⭐ 推荐方案
AI Agent 编排首选
AI Agent 编排首选
Airflow
数据管道编排
适合 ETL/批处理
不适合长周期
不适合长周期
Prefect
Python 原生编排
开发体验优秀
AI 基础设施新兴
AI 基础设施新兴
LangGraph
AI Agent 专用编排
LLM Agent 专用
状态图模型
状态图模型
2.2 综合能力对比矩阵
| 特性 | Temporal | Airflow | Prefect | LangGraph |
|---|---|---|---|---|
| 持久化执行 | ✅ 原生支持(年) | ❌ DAG 有向无环 | ⚠️ 有限支持 | ⚠️ 会话级持久化 |
| 人机协同 | ✅ Signal/Query | ❌ 不支持 | ⚠️ 手动触发 | ⚠️ 需自定义 |
| 错误恢复 | ✅ 自动重试 + 补偿 | ⚠️ 基础重试 | ✅ 智能重试 | ⚠️ 基础异常处理 |
| 可观测性 | ✅ Web UI + History | ✅ Web UI + Logs | ✅ Cloud Dashboard | ⚠️ 需集成 LangSmith |
| 学习曲线 | 中等 | 陡峭 | 平缓 | 中等 |
| AI Agent 适配 | ✅ 完美匹配 | ❌ 不适合 | ✅ 良好支持 | ✅ 专为 LLM 设计 |
| 多语言 SDK | Go/Java/TS/Python/PHP | Python | Python | Python/JS |
| 云原生部署 | ✅ K8S Operator | ✅ Helm Chart | ✅ Docker/K8S | ⚠️ 自托管复杂 |
🏆 综合推荐:Temporal + LangGraph 组合方案
- Temporal:负责宏观业务流程编排(需求→PRD→设计→开发→测试→部署→验收)
- LangGraph:负责微观 AI Agent 协作(单个阶段内的多 Agent 对话与工具调用)
- 优势:结合 Temporal 的持久化可靠性与 LangGraph 的 LLM 原生能力
3. Temporal 深度解析
3.1 Temporal 核心概念
📜 Workflow(工作流)
定义业务逻辑的代码,可以运行数年而不丢失状态。
- 确定性执行(Deterministic)
- 自动状态持久化
- 故障后精确恢复
- 支持子工作流嵌套
⚡ Activity(活动)
工作流中的具体任务单元,执行实际工作。
- 幂等性设计
- 自动重试(指数退避)
- 超时控制(启动/执行)
- 心跳检测(长任务)
📡 Signal(信号)
外部向运行中的工作流发送事件的机制。
- 人机协同审批
- 动态参数调整
- 取消/终止通知
- 外部事件触发
🔍 Query(查询)
查询运行中或已完成工作流的状态。
- 只读不修改
- 实时状态获取
- 自定义查询逻辑
- 用于 Dashboard 展示
3.2 Temporal 工作流示例(研发自动化场景)
// TypeScript 示例:端到端研发自动化工作流
import { proxyActivities, sleep, signal } from '@temporalio/workflow';
import type * as activities from './activities';
const {
generatePRD,
reviewPRD,
designArchitecture,
reviewArchitecture,
generateAPISpec,
codingBackend,
codingFrontend,
unitTest,
integrationTest,
deployToDev,
deployToStaging,
deployToProduction,
uiAutomationTest,
notifySlack
} = proxyActivities({
startToCloseTimeout: '30 minutes',
retry: {
initialInterval: '1 minute',
backoffCoefficient: 2,
maximumAttempts: 5,
},
});
// 人工审批信号
export const prdApproval = signal('prdApproval');
export const architectureApproval = signal('architectureApproval');
export const productionDeploymentApproval = signal('productionDeploymentApproval');
export async function研发自动化工作流(projectId: string, requirements: string) {
let approved = false;
// ========== 阶段 1: 需求分析 → PRD ==========
const prd = await generatePRD(projectId, requirements);
// 等待人工审批 PRD
approved = await prdApproval.first();
if (!approved) {
await notifySlack(`❌ PRD 被拒绝:${projectId}`);
throw new Error('PRD Approval Rejected');
}
// ========== 阶段 2: 架构设计 ==========
const architecture = await designArchitecture(projectId, prd);
// 等待架构委员会审批
approved = await architectureApproval.first();
if (!approved) {
await notifySlack(`❌ 架构设计被拒绝:${projectId}`);
throw new Error('Architecture Approval Rejected');
}
// ========== 阶段 3: API 规范生成 ==========
const apiSpec = await generateAPISpec(projectId, architecture);
// ========== 阶段 4: 并行编码 ==========
// 后端和前端开发可以并行
const [backendCode, frontendCode] = await Promise.all([
codingBackend(projectId, apiSpec),
codingFrontend(projectId, apiSpec),
]);
// ========== 阶段 5: 单元测试 ==========
const unitTestReport = await unitTest(projectId, backendCode, frontendCode);
if (unitTestReport.coverage < 80) {
throw new Error(`单元测试覆盖率不足:${unitTestReport.coverage}%`);
}
// ========== 阶段 6: 集成测试 ==========
const integrationTestReport = await integrationTest(projectId);
if (!integrationTestReport.passed) {
throw new Error('集成测试失败');
}
// ========== 阶段 7: 部署到开发环境 ==========
await deployToDev(projectId);
await notifySlack(`✅ 已部署到开发环境:${projectId}`);
// ========== 阶段 8: 部署到预发环境 ==========
await deployToStaging(projectId);
await notifySlack(`✅ 已部署到预发环境:${projectId}`);
// ========== 阶段 9: UI 自动化测试 ==========
const uiTestReport = await uiAutomationTest(projectId);
if (!uiTestReport.passed) {
throw new Error('UI 自动化测试失败');
}
// ========== 阶段 10: 生产部署审批 ==========
approved = await productionDeploymentApproval.first();
if (!approved) {
await notifySlack(`❌ 生产部署被拒绝:${projectId}`);
return { status: 'CANCELLED', reason: 'Production deployment rejected' };
}
// ========== 阶段 11: 灰度发布到生产 ==========
await deployToProduction(projectId, { strategy: 'canary', percentage: 10 });
await sleep('10 minutes'); // 观察期
// 如果没有告警,全量发布
await deployToProduction(projectId, { strategy: 'rolling', percentage: 100 });
await notifySlack(`🎉 项目 ${projectId} 已成功上线!`);
return {
status: 'COMPLETED',
prd,
architecture,
apiSpec,
testReports: { unitTestReport, integrationTestReport, uiTestReport },
};
}
3.3 Temporal 架构优势
为什么 Temporal 最适合 AI Agent 编排?
- 持久化状态机:Workflow 执行状态自动持久化,即使服务器宕机也能恢复
- 内建重试机制:Activity 失败自动重试,支持指数退避、最大尝试次数
- 超时控制:支持 Schedule-to-Start、Start-to-Close、Heartbeat 超时
- Signal/Query:完美支持人机协同(审批、暂停、恢复、查询状态)
- Timer/睡眠:支持长时间睡眠(小时/天/周),不占用资源
- 子工作流:支持嵌套编排,每个研发阶段可以是独立子工作流
- 多语言 SDK:TypeScript/Python/Go/Java/PHP,适配不同技术栈
- 可观测性:Web UI 查看执行历史、事件时间线、重试记录
3.4 Temporal 部署方案
| 部署方式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Temporal Cloud | 生产环境(推荐) | 零运维、99.99% SLA、自动备份 | 按成本计费 |
| K8S Helm Chart | 生产环境(自建) | 完全控制、数据私有化 | 运维复杂度高 |
| Docker Compose | 开发/测试环境 | 一键启动、本地调试 | 不适合生产 |
4. Airflow vs Prefect 对比
4.1 Apache Airflow 分析
✅ 优点
- 成熟的 DAG 调度系统,社区活跃
- 丰富的 Operator 生态(200+ 内置)
- 强大的 UI 界面(Gantt 图、任务日志)
- 支持 Cron 表达式调度
- 与大数据生态深度集成(Spark、Hive、Presto)
⚠️ 缺点
- DAG 必须是有向无环图,不支持循环/长周期
- 状态持久化弱,任务失败后难以恢复上下文
- 不支持人机协同(无 Signal 机制)
- 动态工作流能力弱(DAG 在解析时固定)
- 运维复杂度高(需要维护 Web Server、Scheduler、Worker、DB)
结论:Airflow 不适合 AI Agent 编排场景
Airflow 设计初衷是数据 ETL 管道调度,强调定时批量任务的 DAG 编排。而 AI Agent 工作流具有长周期、状态依赖、人机协同、动态分支等特点,Airflow 的 DAG 模型无法优雅表达这些需求。
4.2 Prefect 分析
✅ 优点
- Python 原生,装饰器语法简洁(@task、@flow)
- 混合架构(Prefect Cloud + 本地 Worker)
- 动态工作流支持(运行时决定任务图)
- 优秀的错误处理(重试、超时、缓存)
- Prefect Horizon 专门针对 AI/MCP 场景优化
- 开发体验优秀,本地调试方便
⚠️ 缺点
- 持久化能力不如 Temporal(不支持年级别运行)
- 人机协同需要自定义实现
- 多语言支持弱(主要 Python)
- 社区规模小于 Airflow
Prefect 定位:Python 团队的轻量级编排选择
如果团队技术栈以 Python 为主,且工作流时长在小时级别以内,Prefect 是比 Airflow 更好的选择。Prefect 2.0+ 的混合架构和 Horizon AI 基础设施使其成为 Temporal 的轻量级替代方案。
4.3 三者对比总结
| 维度 | Temporal | Prefect | Airflow |
|---|---|---|---|
| 最佳场景 | 长周期、状态敏感、人机协同 | Python 数据管道、AI 任务 | 定时 ETL、批处理 |
| 最长运行时间 | 数年(理论上无限) | 数小时 ~ 数天 | 数分钟 ~ 数小时 |
| 状态恢复 | ✅ 精确恢复到故障点 | ⚠️ 任务级重试 | ❌ 从头重试 |
| 人机协同 | ✅ Signal/Query 原生支持 | ⚠️ 需自定义 | ❌ 不支持 |
| 学习曲线 | 中等 | 平缓 | 陡峭 |
| 推荐指数 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ |
5. AI Agent 编排框架选型
5.1 LangGraph:LLM Agent 专用编排
LangGraph 核心特性:
- 状态图模型:将 Agent 协作建模为状态机(State Graph),支持循环和条件分支
- 持久化内存:Checkpointer 保存对话历史,支持长短期记忆
- 工具调用:原生支持 Function Calling、Tool Use
- 多 Agent 协作:Supervisor、Hierarchical、Network 等拓扑模式
- 人机协同:Human-in-the-loop 中断/审批/编辑状态
- 流式输出:支持 Token 级别的流式响应
// LangGraph 示例:多 Agent 协作(产品 + 架构 + 开发)
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_core.messages import BaseMessage
import operator
# 定义状态
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], operator.add]
current_stage: str
prd_draft: str
architecture_draft: str
code_review_feedback: str
# 定义节点(Agent)
def product_agent(state: AgentState):
"""Product Agent: 根据需求生成 PRD"""
requirements = state['messages'][-1].content
prd = llm.invoke(f"根据以下需求生成 PRD: {requirements}")
return {"prd_draft": prd.content, "current_stage": "PRD_Review"}
def architect_agent(state: AgentState):
"""Architect Agent: 根据 PRD 设计技术方案"""
prd = state['prd_draft']
architecture = llm.invoke(f"根据以下 PRD 设计技术方案:{prd}")
return {"architecture_draft": architecture.content, "current_stage": "Architecture_Review"}
def developer_agent(state: AgentState):
"""Developer Agent: 根据架构编写代码"""
architecture = state['architecture_draft']
code = llm.invoke(f"根据以下架构编写代码:{architecture}")
return {"messages": [("assistant", code.content)], "current_stage": "Code_Review"}
def human_reviewer(state: AgentState):
"""人工评审节点(暂停等待审批)"""
# 这里会暂停,等待人类通过 API 发送审批信号
pass
# 构建状态图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("product_agent", product_agent)
workflow.add_node("architect_agent", architect_agent)
workflow.add_node("developer_agent", developer_agent)
workflow.add_node("human_reviewer", human_reviewer)
# 设置入口
workflow.set_entry_point("product_agent")
# 添加边(条件路由)
workflow.add_conditional_edges(
"product_agent",
lambda x: x["current_stage"],
{
"PRD_Review": "human_reviewer",
}
)
workflow.add_conditional_edges(
"human_reviewer",
lambda x: "approved" if check_approval() else "rejected",
{
"approved": "architect_agent",
"rejected": END,
}
)
workflow.add_edge("architect_agent", "developer_agent")
workflow.add_edge("developer_agent", END)
# 编译并运行
app = workflow.compile()
result = app.invoke({"messages": [("user", "开发一个电商网站")], "current_stage": "Start"})
5.2 其他 AI Agent 框架对比
| 框架 | 特点 | 适用场景 | 推荐度 |
|---|---|---|---|
| LangGraph | 状态图模型、持久化、人机协同 | 复杂多 Agent 协作 | ⭐⭐⭐⭐⭐ |
| AutoGen | 微软出品、对话驱动、Group Chat | 研究实验、快速原型 | ⭐⭐⭐⭐ |
| CrewAI | 角色分工、任务链、易用性强 | 简单 Agent 团队 | ⭐⭐⭐⭐ |
| LlamaIndex Workflows | RAG 优化、数据索引、查询引擎 | 知识库问答场景 | ⭐⭐⭐ |
5.3 分层编排架构
┌─────────────────────────────────────────────────────────────────────────┐
│ 分层编排架构设计 │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ Level 1: 业务流程层 (Temporal) │ │
│ │ │ │
│ │ 需求 → PRD → 架构 → API → Coding → Test → Deploy → Acceptance │ │
│ │ ↓ ↓ ↓ ↓ ↓ ↓ ↓ ↓ │ │
│ │ [Stage1][Stage2][Stage3] [Stage4] [Stage5] [Stage6] [Stage7] │ │
│ │ │ │
│ │ • 长周期持久化 (天/周) │ │
│ │ • 人机协同审批 │ │
│ │ • 跨阶段状态管理 │ │
│ │ • 错误恢复与补偿 │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ Level 2: Agent 协作层 (LangGraph) │ │
│ │ │ │
│ │ 每个 Stage 内部 = 一个 LangGraph StateGraph │ │
│ │ │ │
│ │ 例如 PRD 阶段: │ │
│ │ Research Agent → Writer Agent → Reviewer Agent → Human Approval │ │
│ │ │ │
│ │ • 多 Agent 对话协作 │ │
│ │ • 工具调用(搜索、代码执行、API) │ │
│ │ • 短期记忆(对话历史) │ │
│ │ • 流式输出 │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌───────────────────────────────────────────────────────────────────┐ │
│ │ Level 3: 通信层 (Kafka / NATS) │ │
│ │ │ │
│ │ • Agent 间异步消息传递 │ │
│ │ • 事件驱动架构(Event Sourcing) │ │
│ │ • 解耦 Temporal Worker 与 LangGraph Agent │ │
│ │ • 削峰填谷、缓冲队列 │ │
│ └───────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
6. 消息队列方案对比
6.1 主流消息队列总览
Kafka
分布式事件流平台
高吞吐 Event Streaming
日志聚合/数据管道
日志聚合/数据管道
NATS JetStream
云原生消息系统
低延迟 Pub/Sub
微服务通信首选
微服务通信首选
RabbitMQ
传统消息代理
AMQP 协议成熟稳定
复杂路由场景
复杂路由场景
Redis Streams
轻量级流处理
简单场景/已有 Redis
不推荐核心使用
不推荐核心使用
6.2 性能对比基准
| 指标 | Kafka | NATS JetStream | RabbitMQ | Redis Streams |
|---|---|---|---|---|
| 吞吐量 | 100 万+/秒 | 50 万+/秒 | 5 万~10 万/秒 | 10 万~20 万/秒 |
| 延迟 (P99) | 10~50ms | 1~5ms ⭐ | 5~20ms | 1~3ms ⭐ |
| 消息持久化 | ✅ 磁盘持久化 | ✅ 内存 + 磁盘 | ✅ 可选持久化 | ⚠️ 内存(AOF 可选) |
| Exactly-Once | ✅ 支持(幂等 Producer) | ✅ 支持 | ✅ 支持(Confirm) | ❌ At-Least-Once |
| 消息回溯 | ✅ 按 Offset 重放 | ✅ Stream Consumer | ❌ 消费即删除 | ⚠️ 有限支持 |
| 集群扩展 | ✅ 水平扩展(Partition) | ✅ 水平扩展 | ⚠️ 镜像队列复杂 | ❌ 单主限制 |
| 运维复杂度 | 高(依赖 ZooKeeper/KRaft) | 低(单二进制)⭐ | 中(Erlang 运维) | 低(已有 Redis)⭐ |
6.3 消息模式支持
| 模式 | 说明 | Kafka | NATS | RabbitMQ |
|---|---|---|---|---|
| P2P(点对点) | 一条消息只被一个消费者处理 | ✅ Consumer Group | ✅ Queue Group | ✅ Direct Exchange |
| Pub/Sub(发布订阅) | 一条消息被多个消费者处理 | ✅ 多 Consumer Group | ✅ 多 Subscription | ✅ Fanout Exchange |
| Request/Reply | 同步请求 - 响应模式 | ⚠️ 需自定义 | ✅ 原生支持 ⭐ | ✅ RPC 插件 |
| Event Streaming | 事件溯源、日志聚合 | ✅ 核心能力 ⭐ | ✅ JetStream | ❌ 不适合 |
| 延迟队列 | 定时/延迟投递 | ⚠️ 需自定义 | ✅ 原生支持 | ✅ 插件支持 |
7. Kafka vs NATS vs RabbitMQ 深度对比
7.1 Apache Kafka
✅ 优点
- 超高吞吐量(百万级 TPS)
- 持久化存储(可配置保留策略)
- 支持消息回溯重放
- 成熟的生态系统(Connect、Streams、KSQL)
- 适合 Event Sourcing、CQRS 架构
- 强一致性保证
⚠️ 缺点
- 运维复杂度高(ZooKeeper/KRaft、Broker、Topic 管理)
- 延迟相对较高(10ms+)
- 小消息场景效率低
- Request/Reply 模式不友好
- 资源占用大(JVM、PageCache)
推荐场景:日志聚合、用户行为追踪、数据管道、Event Sourcing、审计日志等需要高吞吐和消息回溯的场景。
7.2 NATS / NATS JetStream
✅ 优点
- 极低延迟(1~5ms,Go 语言高性能实现)
- 极简运维(单二进制文件,无外部依赖)
- 云原生友好(K8S Operator、Helm Chart)
- 支持多种消息模式(Pub/Sub、Queue、Request/Reply、JetStream)
- 多语言客户端(30+ 官方 SDK)
- 资源占用小(内存高效)
⚠️ 缺点
- JetStream 相对年轻(2021 年 GA)
- 生态系统不如 Kafka 成熟
- 超大规模部署案例较少
- 消息回溯能力弱于 Kafka
🏆 推荐方案:NATS JetStream 作为微服务通信骨干
对于 AI Agent 协同场景,NATS 的低延迟、Request/Reply 支持、极简运维使其成为微服务间通信的理想选择。JetStream 提供了持久化和流处理能力,弥补了传统 NATS 的不足。
7.3 RabbitMQ
✅ 优点
- AMQP 协议标准实现,成熟稳定
- 灵活的路由(Exchange、Binding、Routing Key)
- 完善的确认机制(Publisher Confirm、Consumer Ack)
- 丰富的插件生态(延迟队列、Sharding、Federation)
- 管理界面友好
⚠️ 缺点
- 吞吐量相对较低(5 万~10 万 TPS)
- Erlang 运维门槛高
- 集群扩展复杂(镜像队列同步开销大)
- 不支持消息回溯
- 云原生支持较弱
7.4 最终推荐方案
🎯 推荐架构:NATS JetStream + Kafka 混合方案
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| Agent 间实时通信 | NATS Core(Pub/Sub + Request/Reply) | 低延迟、支持同步调用、运维简单 |
| 任务队列 | NATS JetStream(Stream + Consumer) | 持久化、ACK 机制、重试、背压控制 |
| 事件溯源/审计日志 | Kafka | 高吞吐、消息回溯、长期存储 |
| 监控指标聚合 | Kafka | 适合时序数据、对接 Flink/Spark |
8. 推荐架构设计
8.1 整体架构图
┌─────────────────────────────────────────────────────────────────────────────────┐
│ 多 Agent 协同工作流与消息队列架构(Temporal + LangGraph + NATS) │
├─────────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ API Gateway Layer │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ REST API │ │ GraphQL │ │ WebSocket │ │ │
│ │ │ (FastAPI) │ │ (Apollo) │ │ (Socket.IO)│ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ Orchestration Layer (Temporal) │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ Temporal Cluster (3 Nodes HA) │ │ │
│ │ │ │ │ │
│ │ │ Frontend ──► History Service ──► Matching Service ──► Worker │ │ │
│ │ │ ▲ │ │ │ │ │
│ │ │ │ ▼ ▼ │ │ │
│ │ │ │ Cassandra/PostgreSQL Elasticsearch │ │ │
│ │ │ │ (状态持久化) (执行历史搜索) │ │ │
│ │ │ │ │ │ │
│ │ │ Web UI / CLI / tctl │ │ │
│ │ └────────────────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ Workflows: │ │
│ │ • 研发自动化主流程 (End-to-End Pipeline) │ │
│ │ • PRD 设计子流程 (PRD Design Sub-Workflow) │ │
│ │ • 架构设计子流程 (Architecture Design Sub-Workflow) │ │
│ │ • 代码生成子流程 (Code Generation Sub-Workflow) │ │
│ │ • 测试执行子流程 (Test Execution Sub-Workflow) │ │
│ │ • 部署发布子流程 (Deployment Sub-Workflow) │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼─────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ Temporal Workers │ │ Temporal Workers │ │ Temporal Workers │ │
│ │ (Activity Executors)│ │ (Activity Executors)│ │ (Activity Executors)│ │
│ │ │ │ │ │ │ │
│ │ • PRD Generator │ │ • Code Generator │ │ • Test Executor │ │
│ │ • Architecture │ │ • Unit Test │ │ • Deployer │ │
│ │ Designer │ │ • Integration Test │ │ • UI Automation │ │
│ │ • API Spec Gen │ │ • Code Reviewer │ │ • Notifier │ │
│ └──────────┬──────────┘ └──────────┬──────────┘ └──────────┬──────────┘ │
│ │ │ │ │
│ └───────────────────────┼───────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ Message Bus Layer (NATS JetStream) │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ NATS Cluster (3 Nodes RAFT) │ │ │
│ │ │ │ │ │
│ │ │ Streams: │ │ │
│ │ │ • agent-events (Agent 事件流) │ │ │
│ │ │ • task-queue (任务队列) │ │ │
│ │ │ • notifications (通知广播) │ │ │
│ │ │ • audit-log (审计日志 → Kafka 镜像) │ │ │
│ │ │ │ │ │
│ │ │ Consumers: │ │ │
│ │ │ • langgraph-agents (LangGraph Agent 组) │ │ │
│ │ │ • notification-service (通知服务) │ │ │
│ │ │ • monitoring-collector (监控采集) │ │ │
│ │ └────────────────────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ AI Agent Layer (LangGraph) │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────────────────────┐ │ │
│ │ │ LangGraph StateGraphs (per Stage) │ │ │
│ │ │ │ │ │
│ │ │ PRD Stage Graph: │ │ │
│ │ │ Research Agent → Writer Agent → Reviewer Agent → Human Approval │ │ │
│ │ │ │ │ │
│ │ │ Architecture Stage Graph: │ │ │
│ │ │ System Architect → DB Architect → Security Reviewer → Approval │ │ │
│ │ │ │ │ │
│ │ │ Coding Stage Graph: │ │ │
│ │ │ Backend Dev → Frontend Dev → Code Reviewer → Merge │ │ │
│ │ │ │ │ │
│ │ │ Each Agent: │ │ │
│ │ │ • LLM (Claude Code / GPT-4 / DeepSeek) │ │ │
│ │ │ • Tools (Search, Code Interpreter, API Caller) │ │ │
│ │ │ • Memory (Short-term + Long-term via Checkpointer) │ │ │
│ │ └────────────────────────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────────────────────┐ │
│ │ Supporting Services │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Harbor │ │ SonarQube │ │ Prometheus │ │ Grafana │ │ │
│ │ │ (镜像仓库) │ │ (代码质量) │ │ (监控) │ │ (可视化) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Kafka │ │ Elastic │ │ Jaeger │ │ MinIO/S3 │ │ │
│ │ │ (事件归档) │ │ (日志) │ │ (链路追踪) │ │ (对象存储) │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └───────────────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────────┘
8.2 部署配置建议
| 组件 | 节点数 | 配置 | 存储 |
|---|---|---|---|
| Temporal Cluster | 3 | 8 核 16GB | Cassandra 3 节点 / PostgreSQL HA |
| Temporal Workers | 5-10(弹性伸缩) | 16 核 32GB | 临时存储 50GB |
| NATS Cluster | 3 | 8 核 16GB | JetStream 100GB SSD |
| LangGraph Agents | 10-20(按需伸缩) | 8 核 16GB + GPU 可选 | Redis 持久化 20GB |
| Kafka Cluster | 3 | 16 核 32GB | 500GB NVMe × 3 |
9. 在研发自动化系统中的集成
9.1 八大阶段的工具链映射
| 研发阶段 | Temporal Workflow | LangGraph Agents | NATS Topics |
|---|---|---|---|
| 1. 需求分析 | RequirementAnalysisWF | Research Agent, Analyst Agent | agent-events.requirement |
| 2. PRD 设计 | PRDDesignWF | Writer Agent, Reviewer Agent | agent-events.prd |
| 3. 架构设计 | ArchitectureDesignWF | System Architect, DB Architect | agent-events.architecture |
| 4. API 定义 | APISpecWF | API Designer Agent | agent-events.api-spec |
| 5. AI Coding | CodingWF | Backend Dev, Frontend Dev | task-queue.coding |
| 6. 测试执行 | TestingWF | Unit Test Agent, QA Agent | task-queue.testing |
| 7. CI/CD 部署 | DeploymentWF | DevOps Agent | agent-events.deployment |
| 8. UI 验收 | UATWF | UI Test Agent | agent-events.uat |
9.2 典型执行流程
# 端到端执行示例
# 1. 用户提交需求
POST /api/v1/projects
{
"name": "电商平台",
"requirements": "开发一个 B2C 电商网站,包含商品管理、购物车、订单、支付功能",
"team": ["product", "backend", "frontend", "qa"]
}
# 2. Temporal 启动主工作流
client.start_workflow(
"研发自动化工作流",
project_id="proj-001",
requirements="...",
task_queue="rd-automation",
execution_timeout="7 days" # 整个流程最多 7 天
)
# 3. Temporal 调用 Activity → 发布事件到 NATS
# NATS Topic: agent-events.requirement
{
"event_type": "requirement_submitted",
"project_id": "proj-001",
"payload": {...},
"timestamp": "2026-03-12T10:00:00Z"
}
# 4. LangGraph Agent 订阅 NATS 事件并执行
# Research Agent 开始需求分析
app = langgraph_app.compile()
result = app.invoke({
"messages": [("user", requirements)],
"stage": "requirement_analysis"
})
# 5. Agent 完成后发布结果到 NATS
# NATS Topic: agent-events.prd-draft
nats.publish("agent-events.prd-draft", prd_json)
# 6. Temporal 检测到 PRD 完成 → 触发人工审批
# 发送 Signal 给工作流
wf.signal("prdApproval", approved=True)
# 7. 工作流继续执行下一阶段...
# 8. 所有阶段完成后,工作流结束
result = wf.result()
print(f"项目 {project_id} 完成!")
9.3 监控与可观测性
📊 Temporal Web UI
- 查看工作流执行历史
- 事件时间线可视化
- 重试记录与错误详情
- 手动发送 Signal/Query
📈 Prometheus + Grafana
- 工作流成功率/失败率
- Activity 执行耗时 P99
- NATS 消息吞吐量
- Agent Token 消耗统计
🔍 Jaeger 链路追踪
- 跨服务调用追踪
- Agent 执行链路
- 消息传递延迟分析
- 瓶颈定位
📝 ELK 日志聚合
- 集中式日志收集
- 错误日志告警
- 审计日志归档
- 合规性报告
9.4 告警规则示例
# Prometheus Alert Rules 示例
groups:
- name: temporal_alerts
rules:
- alert: WorkflowFailureRateHigh
expr: rate(temporal_workflow_failed_total[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "工作流失败率过高 ({{ $value }})"
- alert: ActivityTaskBacklog
expr: temporal_activity_scheduled_count - temporal_activity_completed_count > 100
for: 10m
labels:
severity: warning
annotations:
summary: "Activity 任务积压 ({{ $value }})"
- name: nats_alerts
rules:
- alert: NATSMessagesPerSecLow
expr: rate(nats_server_sent_msgs_total[5m]) < 100
for: 5m
labels:
severity: warning
annotations:
summary: "NATS 消息速率异常低"
- alert: NATSConsumerLagHigh
expr: nats_jetstream_consumer_pending_msgs > 10000
for: 5m
labels:
severity: critical
annotations:
summary: "NATS Consumer 积压严重"
- name: agent_alerts
rules:
- alert: AgentLLMAPIError
expr: rate(agent_llm_api_errors_total[5m]) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "Agent LLM API 错误率高"
- alert: AgentTokenUsageSpike
expr: rate(agent_token_usage_total[1h]) > 1000000
for: 15m
labels:
severity: info
annotations:
summary: "Agent Token 使用量激增"