🤖 多 Agent 协同工作流引擎
与消息队列方案选型报告

基于 OpenClaw + Claude Code 的端到端研发自动化系统 —— AI Agent 编排架构与事件驱动通信基础设施完整方案

调研日期:2026 年 3 月 12 日 文档类型:架构选型报告 适用对象:架构师/Tech Lead 应用场景:AI Agent 编排 + 微服务通信

1. 需求分析与选型标准

1.1 八大研发阶段的编排需求

端到端研发自动化系统的核心挑战:
  • 长周期流程:从需求到上线可能持续数天甚至数周,需要持久化状态管理
  • 人机协同:关键节点(PRD 评审、架构评审、上线审批)需要人工介入
  • 异步执行:AI Agent 任务(代码生成、测试执行)是长时间运行的异步操作
  • 错误恢复:网络故障、API 限流、LLM 超时等异常需要自动重试和补偿
  • 可观测性:需要追踪每个 Agent 的执行状态、耗时、Token 消耗
  • 并发控制:多个项目并行研发,需要资源隔离和优先级调度

1.2 工作流引擎选型标准

评估维度 权重 具体要求
持久化能力 25% 支持长时间运行(小时/天/周),故障后自动恢复状态
人机协同 20% 支持人工审批节点、暂停/恢复、信号交互
错误处理 20% 内置重试策略、超时控制、补偿事务、死信队列
可观测性 15% 可视化 Dashboard、执行历史、指标监控、日志追踪
开发体验 10% 代码即工作流、本地调试、版本管理、测试框架
扩展性 10% 水平伸缩、多语言 SDK、插件生态、云原生部署

1.3 消息队列选型标准

评估维度 权重 具体要求
吞吐量 20% 支持百万级 TPS,满足高并发 Agent 通信
延迟 20% 毫秒级延迟,保证实时响应
可靠性 20% 消息不丢失、Exactly-Once 语义、事务支持
模式支持 15% P2P、Pub/Sub、Event Streaming、Request/Reply
运维复杂度 15% 部署简单、监控完善、故障自愈
生态集成 10% K8S 友好、主流语言客户端、云服务商支持

2. 工作流引擎方案对比

2.1 主流方案总览

Temporal
分布式持久化工作流
⭐ 推荐方案
AI Agent 编排首选
Airflow
数据管道编排
适合 ETL/批处理
不适合长周期
Prefect
Python 原生编排
开发体验优秀
AI 基础设施新兴
LangGraph
AI Agent 专用编排
LLM Agent 专用
状态图模型

2.2 综合能力对比矩阵

特性 Temporal Airflow Prefect LangGraph
持久化执行 ✅ 原生支持(年) ❌ DAG 有向无环 ⚠️ 有限支持 ⚠️ 会话级持久化
人机协同 ✅ Signal/Query ❌ 不支持 ⚠️ 手动触发 ⚠️ 需自定义
错误恢复 ✅ 自动重试 + 补偿 ⚠️ 基础重试 ✅ 智能重试 ⚠️ 基础异常处理
可观测性 ✅ Web UI + History ✅ Web UI + Logs ✅ Cloud Dashboard ⚠️ 需集成 LangSmith
学习曲线 中等 陡峭 平缓 中等
AI Agent 适配 ✅ 完美匹配 ❌ 不适合 ✅ 良好支持 ✅ 专为 LLM 设计
多语言 SDK Go/Java/TS/Python/PHP Python Python Python/JS
云原生部署 ✅ K8S Operator ✅ Helm Chart ✅ Docker/K8S ⚠️ 自托管复杂
🏆 综合推荐:Temporal + LangGraph 组合方案
  • Temporal:负责宏观业务流程编排(需求→PRD→设计→开发→测试→部署→验收)
  • LangGraph:负责微观 AI Agent 协作(单个阶段内的多 Agent 对话与工具调用)
  • 优势:结合 Temporal 的持久化可靠性与 LangGraph 的 LLM 原生能力

3. Temporal 深度解析

3.1 Temporal 核心概念

📜 Workflow(工作流)

定义业务逻辑的代码,可以运行数年而不丢失状态。

  • 确定性执行(Deterministic)
  • 自动状态持久化
  • 故障后精确恢复
  • 支持子工作流嵌套

⚡ Activity(活动)

工作流中的具体任务单元,执行实际工作。

  • 幂等性设计
  • 自动重试(指数退避)
  • 超时控制(启动/执行)
  • 心跳检测(长任务)

📡 Signal(信号)

外部向运行中的工作流发送事件的机制。

  • 人机协同审批
  • 动态参数调整
  • 取消/终止通知
  • 外部事件触发

🔍 Query(查询)

查询运行中或已完成工作流的状态。

  • 只读不修改
  • 实时状态获取
  • 自定义查询逻辑
  • 用于 Dashboard 展示

3.2 Temporal 工作流示例(研发自动化场景)

// TypeScript 示例:端到端研发自动化工作流 import { proxyActivities, sleep, signal } from '@temporalio/workflow'; import type * as activities from './activities'; const { generatePRD, reviewPRD, designArchitecture, reviewArchitecture, generateAPISpec, codingBackend, codingFrontend, unitTest, integrationTest, deployToDev, deployToStaging, deployToProduction, uiAutomationTest, notifySlack } = proxyActivities({ startToCloseTimeout: '30 minutes', retry: { initialInterval: '1 minute', backoffCoefficient: 2, maximumAttempts: 5, }, }); // 人工审批信号 export const prdApproval = signal('prdApproval'); export const architectureApproval = signal('architectureApproval'); export const productionDeploymentApproval = signal('productionDeploymentApproval'); export async function研发自动化工作流(projectId: string, requirements: string) { let approved = false; // ========== 阶段 1: 需求分析 → PRD ========== const prd = await generatePRD(projectId, requirements); // 等待人工审批 PRD approved = await prdApproval.first(); if (!approved) { await notifySlack(`❌ PRD 被拒绝:${projectId}`); throw new Error('PRD Approval Rejected'); } // ========== 阶段 2: 架构设计 ========== const architecture = await designArchitecture(projectId, prd); // 等待架构委员会审批 approved = await architectureApproval.first(); if (!approved) { await notifySlack(`❌ 架构设计被拒绝:${projectId}`); throw new Error('Architecture Approval Rejected'); } // ========== 阶段 3: API 规范生成 ========== const apiSpec = await generateAPISpec(projectId, architecture); // ========== 阶段 4: 并行编码 ========== // 后端和前端开发可以并行 const [backendCode, frontendCode] = await Promise.all([ codingBackend(projectId, apiSpec), codingFrontend(projectId, apiSpec), ]); // ========== 阶段 5: 单元测试 ========== const unitTestReport = await unitTest(projectId, backendCode, frontendCode); if (unitTestReport.coverage < 80) { throw new Error(`单元测试覆盖率不足:${unitTestReport.coverage}%`); } // ========== 阶段 6: 集成测试 ========== const integrationTestReport = await integrationTest(projectId); if (!integrationTestReport.passed) { throw new Error('集成测试失败'); } // ========== 阶段 7: 部署到开发环境 ========== await deployToDev(projectId); await notifySlack(`✅ 已部署到开发环境:${projectId}`); // ========== 阶段 8: 部署到预发环境 ========== await deployToStaging(projectId); await notifySlack(`✅ 已部署到预发环境:${projectId}`); // ========== 阶段 9: UI 自动化测试 ========== const uiTestReport = await uiAutomationTest(projectId); if (!uiTestReport.passed) { throw new Error('UI 自动化测试失败'); } // ========== 阶段 10: 生产部署审批 ========== approved = await productionDeploymentApproval.first(); if (!approved) { await notifySlack(`❌ 生产部署被拒绝:${projectId}`); return { status: 'CANCELLED', reason: 'Production deployment rejected' }; } // ========== 阶段 11: 灰度发布到生产 ========== await deployToProduction(projectId, { strategy: 'canary', percentage: 10 }); await sleep('10 minutes'); // 观察期 // 如果没有告警,全量发布 await deployToProduction(projectId, { strategy: 'rolling', percentage: 100 }); await notifySlack(`🎉 项目 ${projectId} 已成功上线!`); return { status: 'COMPLETED', prd, architecture, apiSpec, testReports: { unitTestReport, integrationTestReport, uiTestReport }, }; }

3.3 Temporal 架构优势

为什么 Temporal 最适合 AI Agent 编排?
  • 持久化状态机:Workflow 执行状态自动持久化,即使服务器宕机也能恢复
  • 内建重试机制:Activity 失败自动重试,支持指数退避、最大尝试次数
  • 超时控制:支持 Schedule-to-Start、Start-to-Close、Heartbeat 超时
  • Signal/Query:完美支持人机协同(审批、暂停、恢复、查询状态)
  • Timer/睡眠:支持长时间睡眠(小时/天/周),不占用资源
  • 子工作流:支持嵌套编排,每个研发阶段可以是独立子工作流
  • 多语言 SDK:TypeScript/Python/Go/Java/PHP,适配不同技术栈
  • 可观测性:Web UI 查看执行历史、事件时间线、重试记录

3.4 Temporal 部署方案

部署方式 适用场景 优点 缺点
Temporal Cloud 生产环境(推荐) 零运维、99.99% SLA、自动备份 按成本计费
K8S Helm Chart 生产环境(自建) 完全控制、数据私有化 运维复杂度高
Docker Compose 开发/测试环境 一键启动、本地调试 不适合生产

4. Airflow vs Prefect 对比

4.1 Apache Airflow 分析

✅ 优点
  • 成熟的 DAG 调度系统,社区活跃
  • 丰富的 Operator 生态(200+ 内置)
  • 强大的 UI 界面(Gantt 图、任务日志)
  • 支持 Cron 表达式调度
  • 与大数据生态深度集成(Spark、Hive、Presto)
⚠️ 缺点
  • DAG 必须是有向无环图,不支持循环/长周期
  • 状态持久化弱,任务失败后难以恢复上下文
  • 不支持人机协同(无 Signal 机制)
  • 动态工作流能力弱(DAG 在解析时固定)
  • 运维复杂度高(需要维护 Web Server、Scheduler、Worker、DB)
结论:Airflow 不适合 AI Agent 编排场景

Airflow 设计初衷是数据 ETL 管道调度,强调定时批量任务的 DAG 编排。而 AI Agent 工作流具有长周期、状态依赖、人机协同、动态分支等特点,Airflow 的 DAG 模型无法优雅表达这些需求。

4.2 Prefect 分析

✅ 优点
  • Python 原生,装饰器语法简洁(@task、@flow)
  • 混合架构(Prefect Cloud + 本地 Worker)
  • 动态工作流支持(运行时决定任务图)
  • 优秀的错误处理(重试、超时、缓存)
  • Prefect Horizon 专门针对 AI/MCP 场景优化
  • 开发体验优秀,本地调试方便
⚠️ 缺点
  • 持久化能力不如 Temporal(不支持年级别运行)
  • 人机协同需要自定义实现
  • 多语言支持弱(主要 Python)
  • 社区规模小于 Airflow
Prefect 定位:Python 团队的轻量级编排选择

如果团队技术栈以 Python 为主,且工作流时长在小时级别以内,Prefect 是比 Airflow 更好的选择。Prefect 2.0+ 的混合架构和 Horizon AI 基础设施使其成为 Temporal 的轻量级替代方案。

4.3 三者对比总结

维度 Temporal Prefect Airflow
最佳场景 长周期、状态敏感、人机协同 Python 数据管道、AI 任务 定时 ETL、批处理
最长运行时间 数年(理论上无限) 数小时 ~ 数天 数分钟 ~ 数小时
状态恢复 ✅ 精确恢复到故障点 ⚠️ 任务级重试 ❌ 从头重试
人机协同 ✅ Signal/Query 原生支持 ⚠️ 需自定义 ❌ 不支持
学习曲线 中等 平缓 陡峭
推荐指数 ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐

5. AI Agent 编排框架选型

5.1 LangGraph:LLM Agent 专用编排

LangGraph 核心特性:
  • 状态图模型:将 Agent 协作建模为状态机(State Graph),支持循环和条件分支
  • 持久化内存:Checkpointer 保存对话历史,支持长短期记忆
  • 工具调用:原生支持 Function Calling、Tool Use
  • 多 Agent 协作:Supervisor、Hierarchical、Network 等拓扑模式
  • 人机协同:Human-in-the-loop 中断/审批/编辑状态
  • 流式输出:支持 Token 级别的流式响应
// LangGraph 示例:多 Agent 协作(产品 + 架构 + 开发) from langgraph.graph import StateGraph, END from typing import TypedDict, Annotated from langchain_core.messages import BaseMessage import operator # 定义状态 class AgentState(TypedDict): messages: Annotated[list[BaseMessage], operator.add] current_stage: str prd_draft: str architecture_draft: str code_review_feedback: str # 定义节点(Agent) def product_agent(state: AgentState): """Product Agent: 根据需求生成 PRD""" requirements = state['messages'][-1].content prd = llm.invoke(f"根据以下需求生成 PRD: {requirements}") return {"prd_draft": prd.content, "current_stage": "PRD_Review"} def architect_agent(state: AgentState): """Architect Agent: 根据 PRD 设计技术方案""" prd = state['prd_draft'] architecture = llm.invoke(f"根据以下 PRD 设计技术方案:{prd}") return {"architecture_draft": architecture.content, "current_stage": "Architecture_Review"} def developer_agent(state: AgentState): """Developer Agent: 根据架构编写代码""" architecture = state['architecture_draft'] code = llm.invoke(f"根据以下架构编写代码:{architecture}") return {"messages": [("assistant", code.content)], "current_stage": "Code_Review"} def human_reviewer(state: AgentState): """人工评审节点(暂停等待审批)""" # 这里会暂停,等待人类通过 API 发送审批信号 pass # 构建状态图 workflow = StateGraph(AgentState) # 添加节点 workflow.add_node("product_agent", product_agent) workflow.add_node("architect_agent", architect_agent) workflow.add_node("developer_agent", developer_agent) workflow.add_node("human_reviewer", human_reviewer) # 设置入口 workflow.set_entry_point("product_agent") # 添加边(条件路由) workflow.add_conditional_edges( "product_agent", lambda x: x["current_stage"], { "PRD_Review": "human_reviewer", } ) workflow.add_conditional_edges( "human_reviewer", lambda x: "approved" if check_approval() else "rejected", { "approved": "architect_agent", "rejected": END, } ) workflow.add_edge("architect_agent", "developer_agent") workflow.add_edge("developer_agent", END) # 编译并运行 app = workflow.compile() result = app.invoke({"messages": [("user", "开发一个电商网站")], "current_stage": "Start"})

5.2 其他 AI Agent 框架对比

框架 特点 适用场景 推荐度
LangGraph 状态图模型、持久化、人机协同 复杂多 Agent 协作 ⭐⭐⭐⭐⭐
AutoGen 微软出品、对话驱动、Group Chat 研究实验、快速原型 ⭐⭐⭐⭐
CrewAI 角色分工、任务链、易用性强 简单 Agent 团队 ⭐⭐⭐⭐
LlamaIndex Workflows RAG 优化、数据索引、查询引擎 知识库问答场景 ⭐⭐⭐

5.3 分层编排架构

┌─────────────────────────────────────────────────────────────────────────┐
│                        分层编排架构设计                                  │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                         │
│  ┌───────────────────────────────────────────────────────────────────┐ │
│  │ Level 1: 业务流程层 (Temporal)                                     │ │
│  │                                                                    │ │
│  │  需求 → PRD → 架构 → API → Coding → Test → Deploy → Acceptance   │ │
│  │   ↓      ↓      ↓       ↓        ↓       ↓        ↓         ↓     │ │
│  │  [Stage1][Stage2][Stage3] [Stage4]  [Stage5] [Stage6]  [Stage7]   │ │
│  │                                                                    │ │
│  │  • 长周期持久化 (天/周)                                            │ │
│  │  • 人机协同审批                                                    │ │
│  │  • 跨阶段状态管理                                                  │ │
│  │  • 错误恢复与补偿                                                  │ │
│  └───────────────────────────────────────────────────────────────────┘ │
│                                    ↓                                    │
│  ┌───────────────────────────────────────────────────────────────────┐ │
│  │ Level 2: Agent 协作层 (LangGraph)                                   │ │
│  │                                                                    │ │
│  │  每个 Stage 内部 = 一个 LangGraph StateGraph                      │ │
│  │                                                                    │ │
│  │  例如 PRD 阶段:                                                   │ │
│  │  Research Agent → Writer Agent → Reviewer Agent → Human Approval │ │
│  │                                                                    │ │
│  │  • 多 Agent 对话协作                                                │ │
│  │  • 工具调用(搜索、代码执行、API)                                 │ │
│  │  • 短期记忆(对话历史)                                            │ │
│  │  • 流式输出                                                        │ │
│  └───────────────────────────────────────────────────────────────────┘ │
│                                    ↓                                    │
│  ┌───────────────────────────────────────────────────────────────────┐ │
│  │ Level 3: 通信层 (Kafka / NATS)                                     │ │
│  │                                                                    │ │
│  │  • Agent 间异步消息传递                                            │ │
│  │  • 事件驱动架构(Event Sourcing)                                  │ │
│  │  • 解耦 Temporal Worker 与 LangGraph Agent                       │ │
│  │  • 削峰填谷、缓冲队列                                              │ │
│  └───────────────────────────────────────────────────────────────────┘ │
│                                                                         │
└─────────────────────────────────────────────────────────────────────────┘
                    

6. 消息队列方案对比

6.1 主流消息队列总览

Kafka
分布式事件流平台
高吞吐 Event Streaming
日志聚合/数据管道
NATS JetStream
云原生消息系统
低延迟 Pub/Sub
微服务通信首选
RabbitMQ
传统消息代理
AMQP 协议成熟稳定
复杂路由场景
Redis Streams
轻量级流处理
简单场景/已有 Redis
不推荐核心使用

6.2 性能对比基准

指标 Kafka NATS JetStream RabbitMQ Redis Streams
吞吐量 100 万+/秒 50 万+/秒 5 万~10 万/秒 10 万~20 万/秒
延迟 (P99) 10~50ms 1~5ms ⭐ 5~20ms 1~3ms ⭐
消息持久化 ✅ 磁盘持久化 ✅ 内存 + 磁盘 ✅ 可选持久化 ⚠️ 内存(AOF 可选)
Exactly-Once ✅ 支持(幂等 Producer) ✅ 支持 ✅ 支持(Confirm) ❌ At-Least-Once
消息回溯 ✅ 按 Offset 重放 ✅ Stream Consumer ❌ 消费即删除 ⚠️ 有限支持
集群扩展 ✅ 水平扩展(Partition) ✅ 水平扩展 ⚠️ 镜像队列复杂 ❌ 单主限制
运维复杂度 高(依赖 ZooKeeper/KRaft) 低(单二进制)⭐ 中(Erlang 运维) 低(已有 Redis)⭐

6.3 消息模式支持

模式 说明 Kafka NATS RabbitMQ
P2P(点对点) 一条消息只被一个消费者处理 ✅ Consumer Group ✅ Queue Group ✅ Direct Exchange
Pub/Sub(发布订阅) 一条消息被多个消费者处理 ✅ 多 Consumer Group ✅ 多 Subscription ✅ Fanout Exchange
Request/Reply 同步请求 - 响应模式 ⚠️ 需自定义 ✅ 原生支持 ⭐ ✅ RPC 插件
Event Streaming 事件溯源、日志聚合 ✅ 核心能力 ⭐ ✅ JetStream ❌ 不适合
延迟队列 定时/延迟投递 ⚠️ 需自定义 ✅ 原生支持 ✅ 插件支持

7. Kafka vs NATS vs RabbitMQ 深度对比

7.1 Apache Kafka

✅ 优点
  • 超高吞吐量(百万级 TPS)
  • 持久化存储(可配置保留策略)
  • 支持消息回溯重放
  • 成熟的生态系统(Connect、Streams、KSQL)
  • 适合 Event Sourcing、CQRS 架构
  • 强一致性保证
⚠️ 缺点
  • 运维复杂度高(ZooKeeper/KRaft、Broker、Topic 管理)
  • 延迟相对较高(10ms+)
  • 小消息场景效率低
  • Request/Reply 模式不友好
  • 资源占用大(JVM、PageCache)
推荐场景:日志聚合、用户行为追踪、数据管道、Event Sourcing、审计日志等需要高吞吐和消息回溯的场景。

7.2 NATS / NATS JetStream

✅ 优点
  • 极低延迟(1~5ms,Go 语言高性能实现)
  • 极简运维(单二进制文件,无外部依赖)
  • 云原生友好(K8S Operator、Helm Chart)
  • 支持多种消息模式(Pub/Sub、Queue、Request/Reply、JetStream)
  • 多语言客户端(30+ 官方 SDK)
  • 资源占用小(内存高效)
⚠️ 缺点
  • JetStream 相对年轻(2021 年 GA)
  • 生态系统不如 Kafka 成熟
  • 超大规模部署案例较少
  • 消息回溯能力弱于 Kafka
🏆 推荐方案:NATS JetStream 作为微服务通信骨干

对于 AI Agent 协同场景,NATS 的低延迟、Request/Reply 支持、极简运维使其成为微服务间通信的理想选择。JetStream 提供了持久化和流处理能力,弥补了传统 NATS 的不足。

7.3 RabbitMQ

✅ 优点
  • AMQP 协议标准实现,成熟稳定
  • 灵活的路由(Exchange、Binding、Routing Key)
  • 完善的确认机制(Publisher Confirm、Consumer Ack)
  • 丰富的插件生态(延迟队列、Sharding、Federation)
  • 管理界面友好
⚠️ 缺点
  • 吞吐量相对较低(5 万~10 万 TPS)
  • Erlang 运维门槛高
  • 集群扩展复杂(镜像队列同步开销大)
  • 不支持消息回溯
  • 云原生支持较弱

7.4 最终推荐方案

🎯 推荐架构:NATS JetStream + Kafka 混合方案

场景 推荐方案 理由
Agent 间实时通信 NATS Core(Pub/Sub + Request/Reply) 低延迟、支持同步调用、运维简单
任务队列 NATS JetStream(Stream + Consumer) 持久化、ACK 机制、重试、背压控制
事件溯源/审计日志 Kafka 高吞吐、消息回溯、长期存储
监控指标聚合 Kafka 适合时序数据、对接 Flink/Spark

8. 推荐架构设计

8.1 整体架构图

┌─────────────────────────────────────────────────────────────────────────────────┐
│              多 Agent 协同工作流与消息队列架构(Temporal + LangGraph + NATS)     │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│  ┌───────────────────────────────────────────────────────────────────────────┐ │
│  │                          API Gateway Layer                                 │ │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                       │ │
│  │  │   REST API  │  │  GraphQL    │  │  WebSocket  │                       │ │
│  │  │  (FastAPI)  │  │  (Apollo)   │  │  (Socket.IO)│                       │ │
│  │  └─────────────┘  └─────────────┘  └─────────────┘                       │ │
│  └───────────────────────────────────────────────────────────────────────────┘ │
│                                      │                                          │
│                                      ▼                                          │
│  ┌───────────────────────────────────────────────────────────────────────────┐ │
│  │                     Orchestration Layer (Temporal)                         │ │
│  │                                                                            │ │
│  │  ┌────────────────────────────────────────────────────────────────────┐   │ │
│  │  │  Temporal Cluster (3 Nodes HA)                                     │   │ │
│  │  │                                                                    │   │ │
│  │  │  Frontend ──► History Service ──► Matching Service ──► Worker     │   │ │
│  │  │       ▲              │                    │                         │   │ │
│  │  │       │              ▼                    ▼                         │   │ │
│  │  │       │         Cassandra/PostgreSQL    Elasticsearch               │   │ │
│  │  │       │          (状态持久化)           (执行历史搜索)               │   │ │
│  │  │       │                                                            │   │ │
│  │  │  Web UI / CLI / tctl                                               │   │ │
│  │  └────────────────────────────────────────────────────────────────────┘   │ │
│  │                                                                            │ │
│  │  Workflows:                                                                │ │
│  │  • 研发自动化主流程 (End-to-End Pipeline)                                 │ │
│  │  • PRD 设计子流程 (PRD Design Sub-Workflow)                               │ │
│  │  • 架构设计子流程 (Architecture Design Sub-Workflow)                      │ │
│  │  • 代码生成子流程 (Code Generation Sub-Workflow)                          │ │
│  │  • 测试执行子流程 (Test Execution Sub-Workflow)                           │ │
│  │  • 部署发布子流程 (Deployment Sub-Workflow)                               │ │
│  └───────────────────────────────────────────────────────────────────────────┘ │
│                                      │                                          │
│                    ┌─────────────────┼─────────────────┐                       │
│                    │                 │                 │                       │
│                    ▼                 ▼                 ▼                       │
│  ┌─────────────────────┐ ┌─────────────────────┐ ┌─────────────────────┐      │
│  │  Temporal Workers   │ │  Temporal Workers   │ │  Temporal Workers   │      │
│  │  (Activity Executors)│ │  (Activity Executors)│ │  (Activity Executors)│      │
│  │                     │ │                     │ │                     │      │
│  │  • PRD Generator    │ │  • Code Generator   │ │  • Test Executor    │      │
│  │  • Architecture     │ │  • Unit Test        │ │  • Deployer         │      │
│  │    Designer         │ │  • Integration Test │ │  • UI Automation    │      │
│  │  • API Spec Gen     │ │  • Code Reviewer    │ │  • Notifier         │      │
│  └──────────┬──────────┘ └──────────┬──────────┘ └──────────┬──────────┘      │
│             │                       │                       │                  │
│             └───────────────────────┼───────────────────────┘                  │
│                                     │                                          │
│                                     ▼                                          │
│  ┌───────────────────────────────────────────────────────────────────────────┐ │
│  │                    Message Bus Layer (NATS JetStream)                      │ │
│  │                                                                            │ │
│  │  ┌────────────────────────────────────────────────────────────────────┐   │ │
│  │  │  NATS Cluster (3 Nodes RAFT)                                       │   │ │
│  │  │                                                                    │   │ │
│  │  │  Streams:                                                          │   │ │
│  │  │  • agent-events (Agent 事件流)                                     │   │ │
│  │  │  • task-queue (任务队列)                                           │   │ │
│  │  │  • notifications (通知广播)                                        │   │ │
│  │  │  • audit-log (审计日志 → Kafka 镜像)                              │   │ │
│  │  │                                                                    │   │ │
│  │  │  Consumers:                                                        │   │ │
│  │  │  • langgraph-agents (LangGraph Agent 组)                          │   │ │
│  │  │  • notification-service (通知服务)                                │   │ │
│  │  │  • monitoring-collector (监控采集)                                │   │ │
│  │  └────────────────────────────────────────────────────────────────────┘   │ │
│  └───────────────────────────────────────────────────────────────────────────┘ │
│                                      │                                          │
│                                      ▼                                          │
│  ┌───────────────────────────────────────────────────────────────────────────┐ │
│  │                   AI Agent Layer (LangGraph)                               │ │
│  │                                                                            │ │
│  │  ┌────────────────────────────────────────────────────────────────────┐   │ │
│  │  │  LangGraph StateGraphs (per Stage)                                 │   │ │
│  │  │                                                                    │   │ │
│  │  │  PRD Stage Graph:                                                  │   │ │
│  │  │  Research Agent → Writer Agent → Reviewer Agent → Human Approval  │   │ │
│  │  │                                                                    │   │ │
│  │  │  Architecture Stage Graph:                                         │   │ │
│  │  │  System Architect → DB Architect → Security Reviewer → Approval   │   │ │
│  │  │                                                                    │   │ │
│  │  │  Coding Stage Graph:                                               │   │ │
│  │  │  Backend Dev → Frontend Dev → Code Reviewer → Merge               │   │ │
│  │  │                                                                    │   │ │
│  │  │  Each Agent:                                                       │   │ │
│  │  │  • LLM (Claude Code / GPT-4 / DeepSeek)                           │   │ │
│  │  │  • Tools (Search, Code Interpreter, API Caller)                   │   │ │
│  │  │  • Memory (Short-term + Long-term via Checkpointer)               │   │ │
│  │  └────────────────────────────────────────────────────────────────────┘   │ │
│  └───────────────────────────────────────────────────────────────────────────┘ │
│                                      │                                          │
│                                      ▼                                          │
│  ┌───────────────────────────────────────────────────────────────────────────┐ │
│  │                      Supporting Services                                   │ │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐         │ │
│  │  │   Harbor    │ │  SonarQube  │ │ Prometheus  │ │  Grafana    │         │ │
│  │  │ (镜像仓库)  │ │ (代码质量)  │ │  (监控)     │ │ (可视化)    │         │ │
│  │  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘         │ │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐         │ │
│  │  │    Kafka    │ │ Elastic     │ │   Jaeger    │ │  MinIO/S3   │         │ │
│  │  │ (事件归档)  │ │  (日志)     │ │  (链路追踪) │ │ (对象存储)  │         │ │
│  │  └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘         │ │
│  └───────────────────────────────────────────────────────────────────────────┘ │
│                                                                                 │
└─────────────────────────────────────────────────────────────────────────────────┘
                    

8.2 部署配置建议

组件 节点数 配置 存储
Temporal Cluster 3 8 核 16GB Cassandra 3 节点 / PostgreSQL HA
Temporal Workers 5-10(弹性伸缩) 16 核 32GB 临时存储 50GB
NATS Cluster 3 8 核 16GB JetStream 100GB SSD
LangGraph Agents 10-20(按需伸缩) 8 核 16GB + GPU 可选 Redis 持久化 20GB
Kafka Cluster 3 16 核 32GB 500GB NVMe × 3

9. 在研发自动化系统中的集成

9.1 八大阶段的工具链映射

研发阶段 Temporal Workflow LangGraph Agents NATS Topics
1. 需求分析 RequirementAnalysisWF Research Agent, Analyst Agent agent-events.requirement
2. PRD 设计 PRDDesignWF Writer Agent, Reviewer Agent agent-events.prd
3. 架构设计 ArchitectureDesignWF System Architect, DB Architect agent-events.architecture
4. API 定义 APISpecWF API Designer Agent agent-events.api-spec
5. AI Coding CodingWF Backend Dev, Frontend Dev task-queue.coding
6. 测试执行 TestingWF Unit Test Agent, QA Agent task-queue.testing
7. CI/CD 部署 DeploymentWF DevOps Agent agent-events.deployment
8. UI 验收 UATWF UI Test Agent agent-events.uat

9.2 典型执行流程

# 端到端执行示例 # 1. 用户提交需求 POST /api/v1/projects { "name": "电商平台", "requirements": "开发一个 B2C 电商网站,包含商品管理、购物车、订单、支付功能", "team": ["product", "backend", "frontend", "qa"] } # 2. Temporal 启动主工作流 client.start_workflow( "研发自动化工作流", project_id="proj-001", requirements="...", task_queue="rd-automation", execution_timeout="7 days" # 整个流程最多 7 天 ) # 3. Temporal 调用 Activity → 发布事件到 NATS # NATS Topic: agent-events.requirement { "event_type": "requirement_submitted", "project_id": "proj-001", "payload": {...}, "timestamp": "2026-03-12T10:00:00Z" } # 4. LangGraph Agent 订阅 NATS 事件并执行 # Research Agent 开始需求分析 app = langgraph_app.compile() result = app.invoke({ "messages": [("user", requirements)], "stage": "requirement_analysis" }) # 5. Agent 完成后发布结果到 NATS # NATS Topic: agent-events.prd-draft nats.publish("agent-events.prd-draft", prd_json) # 6. Temporal 检测到 PRD 完成 → 触发人工审批 # 发送 Signal 给工作流 wf.signal("prdApproval", approved=True) # 7. 工作流继续执行下一阶段... # 8. 所有阶段完成后,工作流结束 result = wf.result() print(f"项目 {project_id} 完成!")

9.3 监控与可观测性

📊 Temporal Web UI

  • 查看工作流执行历史
  • 事件时间线可视化
  • 重试记录与错误详情
  • 手动发送 Signal/Query

📈 Prometheus + Grafana

  • 工作流成功率/失败率
  • Activity 执行耗时 P99
  • NATS 消息吞吐量
  • Agent Token 消耗统计

🔍 Jaeger 链路追踪

  • 跨服务调用追踪
  • Agent 执行链路
  • 消息传递延迟分析
  • 瓶颈定位

📝 ELK 日志聚合

  • 集中式日志收集
  • 错误日志告警
  • 审计日志归档
  • 合规性报告

9.4 告警规则示例

# Prometheus Alert Rules 示例 groups: - name: temporal_alerts rules: - alert: WorkflowFailureRateHigh expr: rate(temporal_workflow_failed_total[5m]) > 0.1 for: 5m labels: severity: critical annotations: summary: "工作流失败率过高 ({{ $value }})" - alert: ActivityTaskBacklog expr: temporal_activity_scheduled_count - temporal_activity_completed_count > 100 for: 10m labels: severity: warning annotations: summary: "Activity 任务积压 ({{ $value }})" - name: nats_alerts rules: - alert: NATSMessagesPerSecLow expr: rate(nats_server_sent_msgs_total[5m]) < 100 for: 5m labels: severity: warning annotations: summary: "NATS 消息速率异常低" - alert: NATSConsumerLagHigh expr: nats_jetstream_consumer_pending_msgs > 10000 for: 5m labels: severity: critical annotations: summary: "NATS Consumer 积压严重" - name: agent_alerts rules: - alert: AgentLLMAPIError expr: rate(agent_llm_api_errors_total[5m]) > 0.05 for: 5m labels: severity: warning annotations: summary: "Agent LLM API 错误率高" - alert: AgentTokenUsageSpike expr: rate(agent_token_usage_total[1h]) > 1000000 for: 15m labels: severity: info annotations: summary: "Agent Token 使用量激增"