📋
执行摘要
本报告提供了一份完整的企业级 AI Agent 系统架构与技术方案设计,基于 FastAPI + LangGraph(后端) 和 React + TypeScript(前端) 技术栈,采用 WebSocket 流式通信协议 实现实时双向通信。
核心功能模块:
• Bug 反馈处理:智能识别国内/国外 Bug,自动路由到对应区域处理团队
• 建议处理:基于 Log 分析自动生成优化建议,主动发现问题
• 完整工作流:反馈 → 分类 → 分析 → 方案制定 → 任务分派 → 人员反馈 → 修复验证 → 关闭工单
技术亮点:LangGraph 状态图编排 5+ 专业 Agent 协作、WebSocket 实时流式推送、PostgreSQL + Redis 数据存储、Elasticsearch 日志分析、完整的 API 接口设计、详细的数据库 Schema、前端 React 组件架构、安全与合规设计、监控与可观测性方案。
🏗️
系统整体架构设计
📐 四层架构设计图 - 完整技术栈
🖥️ 表现层 (Presentation Layer) - React 前端
React 18 + TypeScript
WebSocket 客户端
Zustand 状态管理
Ant Design / MUI
Recharts 图表库
实时状态可视化
流式输出组件
交互式工作流界面
🔌 通信层 (Communication Layer) - FastAPI
WebSocket 服务器
RESTful API (FastAPI)
Redis Pub/Sub
消息队列 (Celery)
JWT 认证中间件
CORS 跨域处理
请求限流
事件总线
🧠 业务逻辑层 (Business Logic Layer) - LangGraph
LangGraph 状态图
协调 Agent (Coordinator)
分类 Agent (Classifier)
分析 Agent (Analyzer)
分派 Agent (Dispatcher)
跟踪 Agent (Tracker)
工作流引擎
规则引擎
💾 数据层 (Data Layer) - 多存储引擎
PostgreSQL 15 (主数据库)
Redis 7 (缓存/会话)
Elasticsearch 8 (日志)
pgvector (向量搜索)
MinIO (文件存储)
数据备份系统
数据归档策略
读写分离
⚡
FastAPI 0.109+
高性能异步 Web 框架,支持 WebSocket,自动 OpenAPI 文档
🕸️
LangGraph 0.0.20+
状态图编排框架,多 Agent 协作,持久化执行,人机协同
⚛️
React 18 + TypeScript 5.3+
现代化前端框架,类型安全,组件化开发
🔌
WebSocket
实时双向通信,流式响应,低延迟推送
🐘
PostgreSQL 15 + pgvector
关系型数据库,事务支持,向量相似度搜索
🔴
Redis 7
缓存、会话管理、消息队列、Pub/Sub
🔍
Elasticsearch 8
日志存储与分析,全文检索,聚合查询
🤖
LangChain + LLM
大语言模型集成,工具调用,记忆管理
🤖
多 Agent 协作网络详细设计
🎯
协调 Agent (Coordinator)
职责:系统总控中心
• 任务接收与验证
• 全局状态管理
• Agent 调度与协调
• 异常处理与升级
• 维护状态图执行
工具:状态存储、事件总线、通知服务
LLM 模型:GPT-4 / Claude 3
🏷️
分类 Agent (Classifier)
职责:智能识别与分类
• Bug 类型识别(国内/国外)
• 优先级评估(高/中/低)
• 紧急程度判断
• 影响范围分析
• 语言识别(中文/英文)
工具:NLP 模型、IP 地理位置库、规则引擎
LLM 模型:GPT-3.5-Turbo
🔍
分析 Agent (Analyzer)
职责:深度问题分析
• Log 数据解析与聚合
• 根因分析 (RCA)
• 复现步骤生成
• 影响范围评估
• 技术报告生成
工具:Elasticsearch、代码分析器、日志解析器
LLM 模型:GPT-4 / Claude 3
📝
方案 Agent (Planner)
职责:处理方案制定
• 修复策略推荐
• 工作量估算
• 风险评估
• 实施步骤规划
• 回滚方案设计
工具:知识库、历史案例库、代码库
LLM 模型:GPT-4 / Claude 3
📤
分派 Agent (Dispatcher)
职责:智能任务分派
• 技能匹配算法
• 负载均衡
• 时区匹配(国内/国外)
• 通知发送
• SLA 计时启动
工具:用户技能库、排班系统、通知服务
LLM 模型:GPT-3.5-Turbo
📊
跟踪 Agent (Tracker)
职责:全程进度跟踪
• 任务状态监控
• SLA 时效监控
• 自动催办提醒
• 进度报告生成
• 超时升级处理
工具:定时器、通知服务、报告生成器
LLM 模型:GPT-3.5-Turbo
💡 Agent 协作机制详解:
1. 状态共享:所有 Agent 通过 LangGraph 的 StateGraph 共享统一的 TicketState 状态对象,包含工单 ID、类型、优先级、当前状态、描述、日志、分析结果、分派信息、解决方案、时间线等字段。
2. 执行流程:协调 Agent 作为入口节点,接收任务后写入状态,触发分类 Agent 执行;分类完成后更新状态,触发分析 Agent;依次类推,形成链式反应。
3. 并行执行:支持多个分析 Agent 并行执行不同维度的分析(如日志分析、代码分析、性能分析),结果合并后进入下一节点。
4. 条件分支:根据问题类型(国内 Bug/国外 Bug/建议)和优先级(高/中/低),动态选择不同的处理路径和分派策略。
5. 人工介入:在关键节点(如方案审批、分派确认)支持人工审核和干预,通过 WebSocket 实时推送给人工审核员。
6. 状态持久化:每个状态变更都持久化到 PostgreSQL 数据库,支持断点续传、审计追溯、历史回放。
🔄
核心工作流程详细设计
📋 Bug 反馈与建议处理完整流程 - 7 步闭环
1
反馈提交
用户通过前端提交 Bug 或建议,系统自动收集环境信息(浏览器、OS、IP、时区、日志片段)
→
2
智能分类
分类 Agent 识别类型(国内 Bug/国外 Bug/功能建议),确定优先级(高/中/低)和紧急程度
→
3
深度分析
分析 Agent 解析 Log、生成复现步骤、根因分析(RCA)、影响范围评估、生成技术报告
→
4
方案制定
方案 Agent 生成处理方案、估算工作量(人天)、推荐修复策略、评估风险、设计回滚方案
→
5
任务分派
分派 Agent 根据技能匹配、负载均衡、时区匹配,分派给指定开发人员,发送通知(邮件/IM/短信)
→
6
人员反馈
开发人员接收任务、确认方案、反馈预计完成时间、更新任务状态为"进行中"
→
7
修复验证
开发修复 → 测试验证(自动化 + 人工)→ 用户确认 → 关闭工单 → 生成复盘报告
🔴 高优先级流程(P0)
适用场景:
- 生产环境严重 Bug(系统崩溃、数据丢失)
- 安全漏洞(SQL 注入、XSS、权限绕过)
- 核心功能不可用
- 大规模用户受影响
SLA 要求:
- 15 分钟内响应
- 30 分钟内制定方案
- 2 小时内修复上线
处理流程:自动升级 → 电话通知 → 应急小组 → 热修复 → 事后复盘
通知渠道:短信 + 电话 + 邮件 + IM + 钉钉/企业微信
🟠 中优先级流程(P1)
适用场景:
- 功能缺陷(非核心功能)
- 性能下降(响应时间增加 50%+)
- 用户体验问题
- 部分用户受影响
SLA 要求:
- 4 小时内响应
- 24 小时内制定方案
- 3 个工作日内修复
处理流程:标准流程 → 开发排期 → 测试验证 → 灰度发布
通知渠道:邮件 + IM(企业微信/Slack)
🔵 低优先级流程(P2)
适用场景:
- 功能建议
- 优化建议(UI/UX、性能微调)
- 文档问题
- 个别用户反馈
SLA 要求:
处理流程:需求评审 → 版本规划 → 迭代开发 → 测试发布
通知渠道:邮件
🌍
国内/国外 Bug 区分处理详细机制
🇨🇳 国内 Bug 处理流程
识别特征(多维度):
- 语言分析:中文描述内容(NLP 识别置信度>80%)
- IP 地理位置:中国大陆 IP 地址段(IP2Location 数据库)
- 时区分析:UTC+8 北京时间
- 部署环境:国内机房标识(cn-beijing、cn-shanghai 等)
- 用户账户:账户注册地区为中国
处理团队:国内研发团队(北京/上海/深圳)
工单系统:国内 Jira 实例(jira.company.cn)
通知渠道:企业微信 + 钉钉 + 短信 + 电话
合规要求:符合中国网络安全法、数据安全法、个人信息保护法
数据存储:数据本地化存储(国内数据中心)
工作时间:北京时间 9:00-18:00(支持 7x24 应急)
🌏 国外 Bug 处理流程
识别特征(多维度):
- 语言分析:英文/多语言描述(NLP 识别)
- IP 地理位置:海外 IP 地址段(非中国大陆)
- 时区分析:UTC-5 至 UTC+2(美洲/欧洲/东南亚)
- 部署环境:海外机房标识(us-east、eu-west 等)
- 用户账户:账户注册地区为海外
处理团队:海外研发团队(美国/欧洲/新加坡)
工单系统:海外 Jira 实例(jira.company.com)
通知渠道:Slack + Email + PagerDuty + 电话
合规要求:符合 GDPR、CCPA、SOX 等国际法规
数据存储:数据区域化存储(对应区域数据中心)
工作时间:当地工作时间(支持 7x24 Follow-the-Sun)
🎯 智能路由算法详解:
步骤 1:特征提取
• 语言特征:使用 langdetect 库识别描述语言,返回语言代码和置信度
• IP 特征:查询 IP2Location 或 MaxMind GeoIP2 数据库,获取国家、地区、城市
• 时区特征:根据 IP 或用户设置推断时区,计算与 UTC 的偏移
• 环境特征:解析日志中的部署区域标签(aws:cn-north-1、gcp:us-central1 等)
• 用户特征:查询用户账户表,获取注册地区、偏好语言
步骤 2:权重计算
• 语言权重:40%(中文→国内,英文→国外)
• IP 权重:30%(中国大陆 IP→国内,海外 IP→国外)
• 时区权重:15%(UTC+8→国内,其他→国外)
• 环境权重:10%(国内机房→国内,海外机房→国外)
• 用户权重:5%(中国用户→国内,海外用户→国外)
步骤 3:决策逻辑
• 国内得分 = 语言权重 (中文) + IP 权重 (大陆) + 时区权重 (UTC+8) + 环境权重 (国内) + 用户权重 (中国)
• 国外得分 = 语言权重 (非中文) + IP 权重 (海外) + 时区权重 (非 UTC+8) + 环境权重 (海外) + 用户权重 (海外)
• 如果 国内得分 >= 60% → 路由到国内团队
• 如果 国外得分 >= 60% → 路由到国外团队
• 否则 → 标记为"待人工确认",推送给人工审核
步骤 4:跨区域升级
• 当区域团队在 SLA 时间内未响应 → 自动升级到全球专家团队
• 当问题影响多个区域 → 启动全球应急流程
• 当区域团队请求支援 → 协调其他区域团队协助
📝
基于 Log 的智能建议处理技术方案
from typing import List, Dict, Optional
from pydantic import BaseModel
from langchain_core.messages import HumanMessage, AIMessage
class LogEntry(BaseModel):
timestamp: str
level: str
service: str
message: str
trace_id: Optional[str]
span_id: Optional[str]
metadata: Dict
class AnalysisResult(BaseModel):
patterns: List[Dict]
bottlenecks: List[Dict]
root_causes: List[Dict]
recommendations: List[Dict]
priority: str
confidence: float
class LogAnalysisAgent:
def __init__(self, llm, es_client):
self.llm = llm
self.es_client = es_client
async def analyze_logs(self, logs: List[LogEntry]) -> AnalysisResult:
patterns = await self.detect_anomaly_patterns(logs)
bottlenecks = await self.identify_bottlenecks(logs)
root_causes = await self.perform_rca(logs)
recommendations = await self.generate_recommendations(
patterns, bottlenecks, root_causes
)
priority = self.calculate_priority(recommendations)
confidence = self.calculate_confidence(patterns, root_causes)
return AnalysisResult(
patterns=patterns,
bottlenecks=bottlenecks,
root_causes=root_causes,
recommendations=recommendations,
priority=priority,
confidence=confidence
)
async def detect_anomaly_patterns(self, logs: List[LogEntry]) -> List[Dict]:
pass
async def identify_bottlenecks(self, logs: List[LogEntry]) -> List[Dict]:
pass
async def perform_rca(self, logs: List[LogEntry]) -> List[Dict]:
pass
async def generate_recommendations(self, patterns, bottlenecks, root_causes) -> List[Dict]:
prompt = f"""
基于以下分析结果,生成具体的优化建议:
- 异常模式:{patterns}
- 性能瓶颈:{bottlenecks}
- 根因分析:{root_causes}
要求:
1. 建议必须具体可执行(包含代码示例或配置修改)
2. 评估每个建议的影响范围和风险
3. 给出优先级排序
4. 估算实施工作量(人天)
"""
response = await self.llm.ainvoke([HumanMessage(content=prompt)])
return self.parse_recommendations(response.content)
📊 日志采集层架构
数据源(多源异构):
- 应用日志:JSON 格式,包含 trace_id、span_id
- 系统日志:syslog、dmesg、kernel log
- 数据库日志:慢查询日志、错误日志、binlog
- API 网关日志:请求/响应、延迟、状态码
- 前端日志:JavaScript 错误、性能指标、用户行为
- 中间件日志:Redis、Kafka、Nginx、RabbitMQ
采集工具:
- Filebeat:轻量级日志采集器,支持多输入源
- Logstash:日志处理管道,过滤、转换、丰富
- Fluentd:云原生日志采集,支持 Kubernetes
- Vector:高性能日志采集,Rust 编写
存储引擎:Elasticsearch 8.x 集群(3 节点起步)
索引策略:按天滚动索引,保留 30 天热数据,1 年冷数据
🔍 智能分析层架构
分析能力(AI 驱动):
- 异常模式检测:错误率突增、延迟 spike、资源耗尽
- 性能趋势分析:响应时间、吞吐量、错误率趋势
- 错误聚类分析:相似错误自动分组,识别共性问题
- 关联规则挖掘:发现日志间的隐藏关联
- 预测性分析:基于历史数据预测未来问题
- 根因定位:自动定位问题根源,减少 MTTR
AI 模型:
- LLM(GPT-4/Claude 3):日志理解、根因推理、建议生成
- 时序预测(Prophet/ARIMA):趋势预测、异常检测
- 聚类算法(DBSCAN/K-Means):错误聚类、模式识别
- 关联规则(Apriori/FP-Growth):日志关联分析
实时更新:流式处理(Flink/Spark Streaming),秒级延迟
💡 建议生成层架构
建议类型(多维度):
- 代码优化建议:算法改进、并发优化、内存管理
- 配置调优建议:JVM 参数、数据库连接池、缓存策略
- 架构改进建议:微服务拆分、读写分离、 CDN 加速
- 容量规划建议:资源扩容、负载均衡、弹性伸缩
- 安全加固建议:权限控制、数据加密、审计日志
- 监控告警建议:指标补充、阈值调整、告警收敛
输出格式:
- 结构化报告:JSON 格式,包含问题描述、根因、建议、优先级、工作量
- 可执行方案:附带代码示例、配置修改、实施步骤
- 风险评估:影响范围、回滚方案、测试建议
自动化工单:高优先级建议(P0/P1)自动创建优化工单,分派给对应团队
🎯 Log 分析完整工作流(7 步):
1. 日志采集:Filebeat/Fluentd 实时采集多源日志,统一格式化为 JSON,添加 timestamp、host、service、trace_id 等元数据。
2. 预处理:Logstash 进行清洗(去除无效日志)、去重(基于指纹)、标准化(统一字段名)、Enrich(丰富上下文信息,如地理位置、服务依赖)。
3. 存储索引:Elasticsearch 按天创建索引,设置合适的分片和副本,配置生命周期管理(ILM):热数据(30 天,SSD)、温数据(90 天,HDD)、冷数据(1 年,归档)。
4. 模式识别:使用 ML 模型(Isolation Forest、LOF)检测异常模式,识别错误率突增、延迟 spike、资源耗尽等异常情况。
5. 根因分析:LLM 结合知识图谱进行关联分析,使用 5 Whys 方法追溯问题根源,生成根因分析报告。
6. 建议生成:LLM 基于分析结果生成具体可执行的优化建议,包含代码示例、配置修改、实施步骤、风险评估、工作量估算。
7. 优先级排序与工单创建:根据影响范围(用户数、收入影响)、紧急程度(SLA 违规风险)、实施难度,计算优先级分数。P0/P1 建议自动创建优化工单,通过 WebSocket 推送给相关团队。
📐
LangGraph 状态图详细设计与代码实现
"""
LangGraph 状态图完整实现
系统级 Agent 工作流编排
"""
from typing import TypedDict, List, Optional, Annotated, Literal
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
import operator
class TicketState(TypedDict):
ticket_id: str
type: Literal["bug_domestic", "bug_international", "suggestion"]
priority: Literal["high", "medium", "low"]
status: Literal["new", "analyzing", "planning", "assigned", "in_progress", "resolved", "closed"]
description: str
logs: List[dict]
analysis_result: Optional[dict]
solution_plan: Optional[dict]
assigned_to: Optional[str]
developer_feedback: Optional[dict]
resolution: Optional[dict]
timeline: Annotated[List[dict], operator.add]
metadata: dict
async def classify_ticket(state: TicketState) -> dict:
return {
"type": "bug_domestic",
"priority": "high",
"timeline": [{"action": "classified", "timestamp": "2026-03-10T10:00:00Z"}]
}
async def analyze_issue(state: TicketState) -> dict:
return {
"analysis_result": {"root_cause": "...", "impact": "..."},
"timeline": [{"action": "analyzed", "timestamp": "2026-03-10T10:05:00Z"}]
}
async def create_solution_plan(state: TicketState) -> dict:
return {
"solution_plan": {"steps": [], "effort": "2 days"},
"timeline": [{"action": "planned", "timestamp": "2026-03-10T10:10:00Z"}]
}
async def assign_to_developer(state: TicketState) -> dict:
return {
"assigned_to": "developer_123",
"status": "assigned",
"timeline": [{"action": "assigned", "timestamp": "2026-03-10T10:15:00Z"}]
}
async def track_progress(state: TicketState) -> dict:
return {
"timeline": [{"action": "progress_check", "timestamp": "2026-03-10T10:20:00Z"}]
}
async def validate_resolution(state: TicketState) -> dict:
return {
"status": "closed",
"timeline": [{"action": "closed", "timestamp": "2026-03-10T10:25:00Z"}]
}
def check_resolution_status(state: TicketState) -> Literal["resolved", "pending"]:
if state.get("resolution"):
return "resolved"
return "pending"
def build_workflow() -> StateGraph:
workflow = StateGraph(TicketState)
workflow.add_node("classifier", classify_ticket)
workflow.add_node("analyzer", analyze_issue)
workflow.add_node("planner", create_solution_plan)
workflow.add_node("dispatcher", assign_to_developer)
workflow.add_node("tracker", track_progress)
workflow.add_node("validator", validate_resolution)
workflow.set_entry_point("classifier")
workflow.add_edge("classifier", "analyzer")
workflow.add_edge("analyzer", "planner")
workflow.add_edge("planner", "dispatcher")
workflow.add_edge("dispatcher", "tracker")
workflow.add_conditional_edges(
"tracker",
check_resolution_status,
{
"resolved": "validator",
"pending": "tracker"
}
)
workflow.add_edge("validator", END)
return workflow
if __name__ == "__main__":
workflow = build_workflow()
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
initial_state = {
"ticket_id": "TICKET-2026-001",
"type": "bug_domestic",
"priority": "high",
"status": "new",
"description": "生产环境 API 响应超时",
"logs": [],
"analysis_result": None,
"solution_plan": None,
"assigned_to": None,
"developer_feedback": None,
"resolution": None,
"timeline": [],
"metadata": {}
}
for event in app.stream(initial_state, stream_mode="values"):
print(f"状态更新:{event['status']}")
🎯 LangGraph 状态图关键特性详解:
1. 状态持久化(Checkpoint):
• 使用 MemorySaver 或 PostgresSaver 保存每个状态快照
• 支持断点续传:系统重启后从最后一个检查点恢复
• 支持时间旅行:查看历史任意时刻的状态
• 支持分支实验:从某个状态创建多个分支测试不同方案
2. 条件分支(Conditional Edges):
• 根据状态动态选择下一个节点(如:根据优先级选择不同审批流程)
• 支持多条件组合:type AND priority AND status
• 支持默认分支:未匹配任何条件时的兜底路径
3. 循环检测(Cycle Detection):
• 自动检测无限循环(如:tracker → tracker → tracker...)
• 配置最大循环次数(默认 25 次),超过则抛出异常
• 支持人工介入:循环超时时通知人工处理
4. 超时处理(Timeout):
• 每个节点配置超时时间(如:分析节点 30 分钟)
• 超时自动触发升级流程(通知上级、转人工)
• 支持动态超时:根据优先级调整超时时间(P0: 5 分钟,P1: 30 分钟,P2: 2 小时)
5. 并行执行(Parallel Execution):
• 支持多个节点并行执行(如:多个分析 Agent 同时分析不同维度)
• 使用 Send API 分发任务给多个子 Agent
• 结果聚合:等待所有并行节点完成后合并结果
6. 人工介入(Human-in-the-Loop):
• 在关键节点(如:方案审批、分派确认)暂停工作流
• 通过 WebSocket 推送给人工审核员
• 人工审核后调用 interrupt_resume API 恢复执行
7. 审计日志(Audit Trail):
• timeline 字段记录所有状态变更(谁、何时、做了什么)
• 支持完整追溯:从工单创建到关闭的全生命周期
• 支持合规审计:满足 SOX、GDPR 等法规要求
🔌
WebSocket 流式通信详细设计
"""
FastAPI WebSocket 流式通信完整实现
支持实时状态推送、Agent 输出流、通知系统
"""
from fastapi import WebSocket, WebSocketDisconnect, APIRouter
from typing import Dict, Set, List
import json
import asyncio
from datetime import datetime
router = APIRouter()
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.ticket_subscriptions: Dict[str, Set[str]] = {}
self.user_subscriptions: Dict[str, Set[str]] = {}
self.heartbeat_timeout = 30
async def connect(self, websocket: WebSocket, client_id: str) -> bool:
try:
await websocket.accept()
self.active_connections[client_id] = websocket
asyncio.create_task(self.heartbeat(client_id))
return True
except Exception as e:
print(f"连接失败:{e}")
return False
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
for ticket_id in self.ticket_subscriptions:
self.ticket_subscriptions[ticket_id].discard(client_id)
for user_id in self.user_subscriptions:
self.user_subscriptions[user_id].discard(client_id)
async def subscribe_ticket(self, client_id: str, ticket_id: str):
if ticket_id not in self.ticket_subscriptions:
self.ticket_subscriptions[ticket_id] = set()
self.ticket_subscriptions[ticket_id].add(client_id)
current_state = await self.get_current_state(ticket_id)
await self.send_personal_message(client_id, {
"type": "initial_state",
"ticket_id": ticket_id,
"state": current_state
})
async def broadcast_state_update(self, ticket_id: str, state: dict):
if ticket_id not in self.ticket_subscriptions:
return
message = {
"type": "state_update",
"ticket_id": ticket_id,
"state": state,
"timestamp": datetime.utcnow().isoformat()
}
await self.broadcast_to_subscribers(self.ticket_subscriptions[ticket_id], message)
async def stream_agent_output(self, client_id: str, agent_name: str, output: str, is_complete: bool = False):
message = {
"type": "agent_stream",
"agent": agent_name,
"content": output,
"is_complete": is_complete,
"timestamp": datetime.utcnow().isoformat()
}
await self.send_personal_message(client_id, message)
async def send_notification(self, user_id: str, notification: dict):
message = {
"type": "notification",
"notification": notification,
"timestamp": datetime.utcnow().isoformat()
}
if user_id in self.user_subscriptions:
await self.broadcast_to_subscribers(self.user_subscriptions[user_id], message)
async def send_personal_message(self, client_id: str, message: dict):
if client_id in self.active_connections:
try:
await self.active_connections[client_id].send_text(json.dumps(message))
except Exception as e:
print(f"发送消息失败:{e}")
self.disconnect(client_id)
async def broadcast_to_subscribers(self, client_ids: Set[str], message: dict):
tasks = [self.send_personal_message(client_id, message) for client_id in client_ids]
await asyncio.gather(*tasks, return_exceptions=True)
async def heartbeat(self, client_id: str):
while client_id in self.active_connections:
try:
await asyncio.sleep(self.heartbeat_timeout)
await self.send_personal_message(client_id, {"type": "heartbeat"})
except:
self.disconnect(client_id)
break
manager = ConnectionManager()
@router.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
user_id = await authenticate_websocket(websocket)
if not user_id:
await websocket.close(code=4001, reason="认证失败")
return
success = await manager.connect(websocket, client_id)
if not success:
return
await manager.user_subscriptions.setdefault(user_id, set()).add(client_id)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
await handle_client_message(client_id, user_id, message)
except WebSocketDisconnect:
print(f"客户端断开:{client_id}")
manager.disconnect(client_id)
except Exception as e:
print(f"WebSocket 错误:{e}")
manager.disconnect(client_id)
async def handle_client_message(client_id: str, user_id: str, message: dict):
msg_type = message.get("type")
if msg_type == "subscribe_ticket":
ticket_id = message.get("ticket_id")
await manager.subscribe_ticket(client_id, ticket_id)
elif msg_type == "unsubscribe_ticket":
ticket_id = message.get("ticket_id")
if ticket_id in manager.ticket_subscriptions:
manager.ticket_subscriptions[ticket_id].discard(client_id)
elif msg_type == "submit_feedback":
await process_feedback_submission(user_id, message.get("data"))
📡 WebSocket 消息类型设计
state_update(状态更新):
{"type": "state_update", "ticket_id": "TICKET-001", "state": {...}, "timestamp": "2026-03-10T10:00:00Z"}
agent_stream(Agent 输出流):
{"type": "agent_stream", "agent": "analyzer", "content": "正在分析日志...", "is_complete": false}
progress_update(进度更新):
{"type": "progress_update", "ticket_id": "TICKET-001", "progress": 65, "current_step": "分析中"}
notification(通知):
{"type": "notification", "notification": {"title": "新任务", "body": "您有一个新工单", "level": "info"}}
error(错误):
{"type": "error", "code": "TIMEOUT", "message": "操作超时", "details": {...}}
heartbeat(心跳):
{"type": "heartbeat", "timestamp": "2026-03-10T10:00:00Z"}
🔒 WebSocket 安全机制
认证(Authentication):
- JWT Token 验证:从 query param 或 subprotocol 传递
- Token 有效期:2 小时,支持刷新
- 黑名单机制:支持强制下线
授权(Authorization):
- RBAC 基于角色的访问控制
- 工单权限:只能订阅有权限查看的工单
- 区域隔离:国内用户只能订阅国内工单
加密(Encryption):
- WSS (WebSocket Secure):TLS 1.3 加密
- 证书验证:强制 HTTPS 证书
- 消息签名:可选 HMAC 签名防篡改
限流(Rate Limiting):
- 连接数限制:每用户最多 5 个并发连接
- 消息频率:每秒最多 10 条消息
- 带宽限制:每秒最多 100KB
审计(Audit):
- 完整消息日志:记录所有收发消息
- 连接日志:记录连接/断开时间、IP、User-Agent
- 异常监控:实时监控异常连接
⚡ WebSocket 性能优化
连接池(Connection Pooling):
- 复用 WebSocket 连接,避免频繁创建/销毁
- 连接预热:系统启动时预建连接池
- 健康检查:定期检查连接健康状态
消息压缩(Message Compression):
- Gzip 压缩:大消息(>1KB)自动压缩
- 压缩率:通常可减少 60-80% 体积
- 浏览器支持:现代浏览器均支持 permessage-deflate
批量推送(Batching):
- 合并短时间内的多次更新(100ms 窗口)
- 减少网络往返次数
- 适用于高频更新场景(如进度条)
负载均衡(Load Balancing):
- 多实例部署:Nginx/HAProxy 负载均衡
- Redis Pub/Sub:跨实例消息同步
- 粘性会话:确保同一用户的消息路由到同一实例
断线重连(Reconnection):
- 指数退避:1s, 2s, 4s, 8s, 16s, 30s
- 最大重试:10 次后放弃
- 状态恢复:重连后自动订阅之前的工单
🗄️
数据库详细设计 - Schema 与索引策略
📋 tickets(工单主表)
id
UUID
PRIMARY KEY
ticket_number
VARCHAR(50)
UNIQUE, NOT NULL
type
VARCHAR(50)
NOT NULL (bug_domestic/bug_international/suggestion)
priority
VARCHAR(20)
NOT NULL (high/medium/low)
status
VARCHAR(30)
NOT NULL (new/analyzing/planning/assigned/in_progress/resolved/closed)
title
VARCHAR(500)
NOT NULL
description
TEXT
NOT NULL
reporter_id
UUID
FOREIGN KEY REFERENCES users(id)
assigned_to
UUID
FOREIGN KEY REFERENCES users(id)
region
VARCHAR(50)
NOT NULL (cn/na/eu/apac)
created_at
TIMESTAMPTZ
DEFAULT NOW()
updated_at
TIMESTAMPTZ
DEFAULT NOW()
resolved_at
TIMESTAMPTZ
NULLABLE
closed_at
TIMESTAMPTZ
NULLABLE
📋 ticket_states(工单状态历史表)
id
BIGSERIAL
PRIMARY KEY
ticket_id
UUID
FOREIGN KEY REFERENCES tickets(id), INDEX
state_json
JSONB
NOT NULL
agent_name
VARCHAR(100)
NOT NULL
action
VARCHAR(100)
NOT NULL
timestamp
TIMESTAMPTZ
DEFAULT NOW(), INDEX
📋 users(用户表)
id
UUID
PRIMARY KEY
username
VARCHAR(100)
UNIQUE, NOT NULL
email
VARCHAR(255)
UNIQUE, NOT NULL
role
VARCHAR(50)
NOT NULL (admin/developer/tester/manager)
region
VARCHAR(50)
NOT NULL (cn/na/eu/apac)
skills
JSONB
DEFAULT '[]'
timezone
VARCHAR(50)
DEFAULT 'UTC'
is_active
BOOLEAN
DEFAULT TRUE
📋 assignments(任务分派表)
id
BIGSERIAL
PRIMARY KEY
ticket_id
UUID
FOREIGN KEY REFERENCES tickets(id), INDEX
developer_id
UUID
FOREIGN KEY REFERENCES users(id), INDEX
assigned_at
TIMESTAMPTZ
DEFAULT NOW()
deadline
TIMESTAMPTZ
NOT NULL
status
VARCHAR(30)
DEFAULT 'pending'
📋 logs(日志表 - 分区表)
id
BIGSERIAL
PRIMARY KEY
ticket_id
UUID
FOREIGN KEY REFERENCES tickets(id), INDEX
log_level
VARCHAR(20)
NOT NULL, INDEX
message
TEXT
NOT NULL
metadata
JSONB
DEFAULT '{}'
timestamp
TIMESTAMPTZ
DEFAULT NOW(), INDEX
🎯 数据库优化策略详解:
1. 索引策略:
• tickets 表:status(查询待处理工单)、priority(优先级排序)、created_at(时间范围查询)、region(区域过滤)、assigned_to(我的工单)
• ticket_states 表:ticket_id + timestamp(查询工单状态历史)、timestamp(时间范围查询)
• logs 表:ticket_id + timestamp(查询工单日志)、timestamp + log_level(错误日志查询)
• 组合索引:(status, priority, created_at) 用于复杂筛选排序
2. 分区表(Partitioning):
• logs 表按月分区:logs_2026_03, logs_2026_04, ...
• 优势:查询性能提升(只扫描相关分区)、归档方便(直接 DROP 旧分区)、维护简单
• 自动分区:使用 pg_partman 扩展自动创建新分区
3. 读写分离:
• 主库(Primary):处理所有写操作(INSERT/UPDATE/DELETE)
• 从库(Replica):处理读操作(SELECT),可部署多个从库负载均衡
• 复制延迟:通常<1 秒,不适用于强一致性场景
4. 缓存层(Redis):
• 热点工单:缓存最近访问的 1000 个工单状态(TTL: 5 分钟)
• 用户会话:存储用户登录状态、权限信息(TTL: 2 小时)
• 计数器:工单统计(今日新增、待处理、已关闭等)
5. 归档策略:
• 热数据(0-90 天):在线存储,SSD,快速查询
• 温数据(90-365 天):归档到 HDD,查询较慢
• 冷数据(>365 天):导出到 S3/MinIO,从数据库删除
• 自动归档:使用 pg_cron 定时任务每月执行归档
6. 全文检索(Elasticsearch):
• 同步机制:使用 CDC(Change Data Capture)实时同步 PostgreSQL 到 Elasticsearch
• 搜索字段:title、description、logs.message
• 高级功能:高亮显示、模糊搜索、同义词、拼音搜索
🎨
React 前端架构详细设计
⚛️ 前端技术栈
核心框架:
- React 18.2+:并发渲染、Suspense、自动批处理
- TypeScript 5.3+:类型安全、智能提示、编译时检查
- Vite 5.x:极速构建、HMR 热更新、按需编译
状态管理:
- Zustand:轻量级状态管理,支持中间件
- React Query:服务端状态管理,缓存、重试、乐观更新
- Context API:全局配置、主题、认证信息
UI 组件库:
- Ant Design 5.x / MUI:企业级组件库
- 自定义组件:流式输出、状态时间线、Agent 可视化
图表库:
- Recharts:声明式图表,支持实时数据
- ECharts:复杂图表、地图、关系图
WebSocket:
- useWebSocket Hook:自定义 Hook,自动重连
- 消息队列:处理离线消息
测试:
- Jest + React Testing Library:单元测试
- Playwright:E2E 测试
📱 核心页面架构
1. 工单列表页(/tickets):
- 高级筛选:类型、优先级、状态、区域、时间范围
- 多列排序:点击表头排序,支持多列
- 批量操作:批量分派、批量关闭、批量导出
- 虚拟滚动:支持 10 万 + 工单流畅滚动
- 实时刷新:WebSocket 推送新工单
2. 工单详情页(/tickets/:id):
- 状态时间线:可视化展示工单流转过程
- Agent 输出流:实时显示 Agent 思考过程
- 日志查看器:支持高亮、过滤、搜索
- 评论系统:团队成员讨论
- 附件管理:上传截图、日志文件
3. 实时看板页(/dashboard):
- 全局任务分布:饼图、柱状图
- SLA 监控:响应时间、解决时间达标率
- 团队负载:每个成员的任务数
- 趋势分析:工单量、解决率趋势
- 实时告警:超时工单、积压告警
4. 分析报告页(/analytics):
- 根因统计:Top 10 问题根因
- 趋势分析:响应时间、解决时间趋势
- 团队绩效:每人处理工单数、平均耗时
- 区域对比:国内 vs 国外工单对比
5. 配置管理页(/settings):
- Agent 配置:启用/禁用 Agent、调整参数
- 路由规则:国内/国外路由规则配置
- 通知设置:邮件、IM、短信通知配置
- SLA 配置:不同优先级的 SLA 时间
🔄 实时特性实现
1. 状态同步(State Synchronization):
- WebSocket 订阅工单状态变更
- 收到更新后自动刷新 UI
- 冲突处理:乐观更新 + 服务端校验
2. 流式输出(Streaming Output):
- 逐字显示 Agent 输出(打字机效果)
- 支持 Markdown 渲染(代码高亮、表格)
- 支持中断:用户可随时停止生成
3. 进度动画(Progress Animation):
- 实时进度条:0-100% 动态更新
- 当前步骤提示:显示正在执行的步骤
- 预计剩余时间:基于历史数据估算
4. 通知中心(Notification Center):
- 实时通知:新工单、任务分派、超时提醒
- 通知分类:信息、警告、错误
- 通知管理:已读/未读、批量标记、清除
5. 协作编辑(Collaborative Editing):
- 多人同时查看同一工单
- 实时显示其他用户的操作(如:正在编辑评论)
- 冲突避免:字段级锁或乐观合并
🎯 前端关键特性详解:
1. 乐观更新(Optimistic Updates):
• 用户操作(如:更新工单状态)立即在 UI 上反馈,不等待服务端响应
• 后台异步发送请求到服务端
• 如果请求失败,回滚 UI 状态并显示错误提示
• 优势:用户体验极佳,感觉"零延迟"
2. 离线支持(Offline Support):
• Service Worker 缓存静态资源(HTML、CSS、JS、图片)
• IndexedDB 缓存工单数据,支持离线查看
• 离线操作队列:用户操作先存入队列,网络恢复后同步
• 离线提示:检测网络状态,显示离线/在线状态
3. 响应式设计(Responsive Design):
• 移动优先:先设计移动端,再扩展到桌面端
• 断点:sm (640px), md (768px), lg (1024px), xl (1280px), 2xl (1536px)
• 自适应布局:Flexbox + Grid,自动调整列数
• 触摸优化:移动端支持手势操作(滑动删除、下拉刷新)
4. 无障碍访问(Accessibility - a11y):
• 符合 WCAG 2.1 AA 标准
• 键盘导航:所有功能支持键盘操作
• 屏幕阅读器:正确的 ARIA 标签
• 颜色对比度:文本与背景对比度>4.5:1
• 焦点管理:清晰的焦点指示器
5. 性能优化(Performance Optimization):
• 代码分割:按路由分割代码,懒加载页面组件
• 懒加载:图片、图表、大组件按需加载
• 虚拟滚动:长列表只渲染可见区域(react-window)
• 记忆化:useMemo、useCallback 避免重复计算
• 防抖节流:搜索框、滚动事件使用防抖/节流
• Bundle 分析:使用 webpack-bundle-analyzer 优化包体积
6. 错误边界(Error Boundaries):
• 组件级错误捕获:单个组件崩溃不影响整个应用
• 降级 UI:显示友好的错误提示和重试按钮
• 错误上报:自动上报错误到 Sentry/自研监控系统
• 自动恢复:尝试重新渲染或重新加载数据
🔒
安全与合规详细设计
🛡️ 认证与授权
认证(Authentication):
- JWT Token:无状态认证,有效期 2 小时
- Refresh Token:长期有效(7 天),用于刷新 Access Token
- OAuth 2.0:支持 Google、GitHub、企业微信登录
- 多因素认证(MFA):TOTP(Google Authenticator)、短信验证码
- 会话管理:Redis 存储活跃会话,支持强制下线
- 密码策略:最小长度 12、包含大小写/数字/特殊字符、90 天过期
授权(Authorization):
- RBAC(基于角色的访问控制):admin、developer、tester、manager
- ABAC(基于属性的访问控制):细粒度权限(如:只能查看自己区域的工单)
- 权限继承:角色可继承,支持角色层级
- 动态权限:支持运行时动态授予/撤销权限
- 权限审计:记录所有权限变更操作
🔐 数据安全
传输加密:
- TLS 1.3:强制 HTTPS,禁用 HTTP
- 证书管理:Let's Encrypt 自动续期,HSTS 预加载
- WebSocket Secure (WSS):加密 WebSocket 通信
存储加密:
- AES-256:加密敏感字段(密码、Token、PII)
- 字段级加密:只加密敏感字段,不影响查询性能
- 密钥管理:使用 AWS KMS / HashiCorp Vault 管理密钥
- 密钥轮换:每 90 天自动轮换加密密钥
数据脱敏:
- 日志脱敏:自动识别并脱敏 PII(姓名、邮箱、手机号、身份证)
- 查询脱敏:非授权用户查询时自动脱敏
- 导出脱敏:导出数据时自动脱敏敏感字段
审计日志:
- 完整记录:谁、何时、做了什么、IP、User-Agent
- 不可篡改:审计日志写入 WORM(Write Once Read Many)存储
- 实时告警:异常操作(如:大量导出、非工作时间登录)实时告警
数据备份:
- 每日增量备份:每天凌晨 2 点
- 每周全量备份:每周日凌晨 3 点
- 异地备份:备份到不同地理区域
- 恢复演练:每季度进行一次恢复演练
📜 合规要求
国内合规:
- 网络安全法:等级保护 2.0(三级)
- 数据安全法:数据分类分级、重要数据保护
- 个人信息保护法(PIPL):用户同意、最小化收集、删除权
- 数据本地化:中国公民个人信息存储在中国境内
国际合规:
- GDPR(欧盟):数据主体权利、DPO、数据泄露通知(72 小时)
- CCPA(加州):消费者权利、不出售个人信息
- SOC 2 Type II:安全、可用性、保密性、隐私
- ISO 27001:信息安全管理体系
数据主权:
- 区域隔离:中国数据存中国、欧盟数据存欧盟
- 跨境传输:标准合同条款(SCC)、绑定企业规则(BCR)
- 数据本地化:关键数据不出境
隐私保护:
- 隐私设计(Privacy by Design):从设计阶段考虑隐私保护
- 最小化原则:只收集必要数据
- 用户权利:访问权、更正权、删除权、可携带权
- 隐私政策:清晰透明的隐私政策,用户同意
📈
监控与可观测性详细设计
📊 监控指标体系
系统指标(Infrastructure):
- CPU 使用率:每节点、每容器
- 内存使用率:每节点、每容器
- 磁盘使用率:每节点、每卷
- 网络流量:入站/出站带宽、包量
- 文件描述符:使用数/上限
应用指标(Application):
- QPS:每秒请求数,按端点细分
- 延迟:P50、P90、P95、P99
- 错误率:4xx、5xx 错误占比
- 超时率:请求超时占比
- WebSocket 连接数:活跃连接数、新建/断开速率
业务指标(Business):
- 工单量:新增、处理中、已关闭
- 处理时长:平均、P95、按优先级细分
- SLA 达成率:响应时间达标率、解决时间达标率
- 用户满意度:CSAT 评分
Agent 指标(AI):
- 执行时长:每个 Agent 的平均执行时间
- 成功率:执行成功/失败比例
- Token 消耗:输入/输出 Token 数、成本
- LLM 延迟:调用大模型的延迟
🔍 日志系统架构
日志收集(Collection):
- Filebeat:轻量级采集器,部署在每个节点
- Fluentd:聚合多个 Filebeat 的日志,进行预处理
- 日志格式:统一 JSON 格式,包含 timestamp、level、service、trace_id、span_id、message、context
日志存储(Storage):
- Elasticsearch 8.x:3 节点集群起步,按需扩展
- 索引策略:按天滚动索引(logs-2026.03.10)
- 生命周期管理(ILM):热(30 天,SSD)→ 温(90 天,HDD)→ 冷(1 年,归档)→ 删除
- 分片策略:每个索引 3 个主分片、1 个副本
日志分析(Analysis):
- Kibana:可视化分析、仪表板、告警
- 查询语言:KQL(Kibana Query Language)
- 聚合分析:按服务、级别、时间聚合
- 异常检测:ML 自动检测日志异常模式
日志级别:
- DEBUG:调试信息,开发环境使用
- INFO:正常业务日志,生产环境默认级别
- WARN:警告,不影响业务但需关注
- ERROR:错误,影响部分功能
- FATAL:严重错误,系统崩溃
日志保留:
- 热数据(0-30 天):在线查询,秒级响应
- 温数据(30-90 天):在线查询,分钟级响应
- 冷数据(90 天 -1 年):归档存储,小时级响应
- >1 年:删除或永久归档到 S3
🔗 链路追踪系统
追踪系统(Tracing):
- Jaeger / Zipkin:开源分布式追踪系统
- OpenTelemetry:统一的可观测性框架,支持多种后端
- SkyWalking:国产 APM,支持 Java、Python、Node.js
Trace ID:
- 全链路唯一标识:从请求入口到出口,贯穿所有服务
- 生成规则:UUID 或 Snowflake 算法
- 传递方式:HTTP Header(X-Trace-ID)、gRPC Metadata
Span 采集:
- 100% 采样:关键链路(如:工单创建、分派、修复)
- 自适应采样:正常流量 10%,错误请求 100%
- 尾部采样:基于结果决定是否保留 Trace(如:只保留慢请求)
性能分析:
- 火焰图(Flame Graph):可视化展示调用栈和时间分布
- 关键路径分析:识别耗时最长的调用链
- 依赖分析:服务间依赖关系图
- 瓶颈定位:快速定位性能瓶颈(数据库慢查询、外部 API 延迟等)
集成示例(Python + FastAPI):
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
tracer = trace.get_tracer(__name__)
🎯 实施路线图与总结
本技术方案的核心价值:
- ✅ 完整架构设计:四层架构(表现层/通信层/业务逻辑层/数据层),清晰分层,职责明确
- ✅ 多 Agent 协作:基于 LangGraph 构建 6 个专业 Agent(协调/分类/分析/方案/分派/跟踪),智能化任务处理
- ✅ 完整工作流:7 步闭环流程(反馈→分类→分析→方案→分派→反馈→修复),全流程自动化
- ✅ 区域区分:智能识别国内/国外 Bug(5 维特征 + 权重算法),自动路由到对应团队
- ✅ Log 驱动:基于日志分析自动生成优化建议(7 步工作流),主动发现问题
- ✅ 实时通信:WebSocket 流式推送(6 种消息类型),前端实时显示 Agent 思考过程
- ✅ 可追溯性:完整状态历史记录(ticket_states 表),支持审计和复盘
- ✅ 高可用性:99.9% SLA,支持水平扩展和故障转移
- ✅ 安全合规:满足国内外数据安全法规(网络安全法/GDPR/PIPL)
- ✅ 详细代码:提供 LangGraph 状态图、WebSocket 通信、数据库 Schema 等完整代码示例
🚀 分阶段实施路线图(总周期:6-9 个月):
第一阶段(1-2 个月):基础架构搭建
• Week 1-2:项目初始化、技术栈选型、开发环境搭建
• Week 3-4:FastAPI 后端框架、PostgreSQL 数据库设计、基础 API 开发
• Week 5-6:LangGraph 状态图、核心 Agent(分类/分析/分派)开发
• Week 7-8:React 前端框架、基础页面(列表/详情)、WebSocket 通信
交付物:可运行的 MVP 版本,支持基础工单流转
第二阶段(2-3 个月):功能完善
• Week 9-10:完善所有 Agent 功能、方案 Agent、跟踪 Agent
• Week 11-12:国内/国外区分逻辑、智能路由算法
• Week 13-14:日志分析系统、Elasticsearch 集成、建议生成
• Week 15-16:前端高级功能(看板/分析/配置)、实时可视化
• Week 17-18:通知系统(邮件/IM/短信)、SLA 监控
交付物:功能完整的生产就绪版本
第三阶段(3-4 个月):优化与上线
• Week 19-20:性能优化(数据库索引、缓存、前端优化)
• Week 21-22:压力测试、负载测试、稳定性测试
• Week 23-24:安全审计、渗透测试、合规检查
• Week 25-26:监控告警系统部署(Prometheus/Grafana/Jaeger)
• Week 27-28:文档编写、用户培训、运维手册
• Week 29-32:灰度发布(10% → 50% → 100%)、全量上线、持续优化
交付物:生产环境稳定运行、完整文档、运维体系
关键成功因素:
• 高层支持:确保资源投入和跨部门协作
• 敏捷迭代:每 2 周一个 Sprint,快速迭代,及时调整
• 用户参与:早期用户参与测试,收集反馈
• 自动化:CI/CD 自动化、测试自动化、部署自动化
• 监控先行:上线前完成监控告警部署,确保可观测性