基于 FastAPI + LangGraph + React + WebSocket 的系统级 Agent 完整解决方案
本报告详细阐述了一套基于 FastAPI + LangGraph + React + WebSocket 的企业级 AI Agent 系统架构,专门用于构建系统级智能运维 Agent。该系统实现了从 Bug 反馈(区分国内/国外)、日志分析、根因定位、方案制定、任务下发到修复闭环的全流程自动化。
| 指标 | 目标值 | 说明 |
|---|---|---|
| 平均响应时间 (P95) | < 500ms | WebSocket 消息端到端延迟 |
| Bug 分类准确率 | > 95% | 基于 LLM 的智能分类 |
| 根因定位成功率 | > 85% | Top-3 推荐命中率 |
| 工单自动关闭率 | > 60% | 无需人工介入的比例 |
| 系统可用性 | 99.9% | 年度 SLA 目标 |
| 并发连接数 | 10,000+ | 单集群 WebSocket 连接上限 |
| 需求类别 | 具体需求 | 优先级 | 技术方案 |
|---|---|---|---|
| Bug 反馈 | 多渠道接入(Web/App/API) | P0 | 统一 Gateway + 适配器模式 |
| 自动区域识别(国内/国外) | P0 | IP 地理位置 + 用户配置 | |
| 智能分类与优先级判定 | P0 | LLM + Few-shot Learning | |
| 建议处理 | 日志自动采集与聚合 | P0 | ELK Stack + Fluentd |
| 异常检测与根因分析 | P0 | LangGraph RCA Agent | |
| 修复方案自动生成 | P1 | RAG + Code Search | |
| 工作流管理 | 任务自动分派给指定人员 | P0 | 规则引擎 + 负载均衡 |
| 处理进度实时跟踪 | P1 | WebSocket 推送 + 状态机 | |
| 修复验证与闭环 | P0 | 自动化测试 + 人工确认 |
接入层作为系统的门户,负责处理所有外部请求的接入、认证和初步路由。
def route_by_region(request: Request) -> str:
# 获取客户端 IP
client_ip = request.client.host
# 查询地理位置数据库
geo_info = geo_db.lookup(client_ip)
# 判断是否为中国大陆
if geo_info.country == 'CN':
return 'china-cluster'
else:
return 'global-cluster'
应用服务层承载核心业务逻辑,通过 FastAPI 提供 RESTful API 和 WebSocket 端点。
app/
├── main.py # FastAPI 应用入口
├── api/
│ ├── v1/
│ │ ├── bug_reports.py # Bug 反馈接口
│ │ ├── suggestions.py # 建议提交接口
│ │ └── tickets.py # 工单管理接口
│ └── websocket.py # WebSocket 处理器
├── agents/
│ ├── classifier.py # Bug 分类 Agent
│ ├── rca.py # 根因分析 Agent
│ └── dispatcher.py # 任务分发 Agent
├── models/
│ ├── ticket.py # 工单数据模型
│ └── log.py # 日志数据模型
└── services/
├── notification.py # 通知服务
└── analytics.py # 分析服务
智能核心层是系统的大脑,基于 LangGraph 构建多个专用 Agent 协同工作。
数据层采用多模存储策略,针对不同数据类型选择最优存储方案。
| 数据类型 | 存储方案 | 保留策略 | 访问模式 |
|---|---|---|---|
| 工单元数据 | PostgreSQL | 永久 | 事务型读写 |
| 应用日志 | Elasticsearch | 90 天热 + 1 年冷 | 写入为主,偶尔查询 |
| 会话状态 | Redis | TTL 30 分钟 | 高频读写 |
| 知识库 Embedding | Milvus | 永久 | 向量相似度搜索 |
| 文件附件 | MinIO/S3 | 永久 | 一次性写入,多次读取 |
触发条件:用户通过 Web/App/API 提交 Bug 反馈或建议
处理步骤:
# 工单初始状态
ticket = {
"id": "TKT-20260312-00123",
"region": "CN", # CN / GLOBAL
"type": "BUG",
"status": "RECEIVED",
"priority": "PENDING",
"created_at": "2026-03-12T10:30:00Z",
"reporter": "user_12345",
"assignee": None
}
执行 Agent:Bug 分类 Agent(基于 LangGraph)
处理步骤:
# LLM Prompt 示例
prompt = """
你是一位经验丰富的技术支持专家。请分析以下 Bug 报告:
标题:{title}
描述:{description}
环境:{environment}
请回答:
1. Bug 类型(前端/后端/数据库/网络/其他)
2. 严重程度(P0-紧急/P1-高/P2-中/P3-低)
3. 相关标签(最多 5 个)
4. 建议的处理团队
请以 JSON 格式返回。
"""
执行 Agent:RCA 分析 Agent(基于 LangGraph + Elasticsearch)
处理步骤:
# Elasticsearch 查询示例
query = {
"query": {
"bool": {
"must": [
{"term": {"service": "payment-service"}},
{"range": {"@timestamp": {"gte": "now-1h"}}},
{"match": {"message": "timeout OR exception OR error"}}
]
}
},
"aggs": {
"error_types": {
"terms": {"field": "error_type.keyword"}
}
}
}
执行 Agent:方案生成 Agent(基于 RAG + Code Search)
处理步骤:
执行 Agent:任务分发 Agent(基于规则引擎 + 负载均衡)
处理步骤:
# 分发算法伪代码
def assign_ticket(ticket, team):
candidates = get_available_members(team)
for member in candidates:
score = calculate_score(
skills_match(ticket.tags, member.skills),
current_load(member.open_tickets),
timezone_fit(ticket.region, member.timezone),
response_time(member.avg_response_time)
)
member.score = score
best_candidate = max(candidates, key=lambda x: x.score)
return best_candidate
人机协作阶段:工程师接收任务并处理
处理步骤:
执行 Agent:验证 Agent(自动化测试 + 人工确认)
处理步骤:
LangGraph 的核心是 StateGraph,它定义了 Agent 工作流的状态转换逻辑。
from typing import TypedDict, List, Annotated
from langgraph.graph import StateGraph, START, END
import operator
# 定义共享状态
class TicketState(TypedDict):
"""工单处理状态"""
ticket_id: str
region: str
bug_type: str
priority: str
logs: List[dict]
rca_report: dict
solution: dict
assignee: str
messages: Annotated[List[str], operator.add]
status: str
# 创建状态图
workflow = StateGraph(TicketState)
# 添加节点
workflow.add_node("classify_bug", classify_bug_node)
workflow.add_node("analyze_logs", analyze_logs_node)
workflow.add_node("generate_solution", generate_solution_node)
workflow.add_node("assign_ticket", assign_ticket_node)
workflow.add_node("notify_engineer", notify_engineer_node)
# 定义边(状态转换)
workflow.add_edge(START, "classify_bug")
workflow.add_edge("classify_bug", "analyze_logs")
workflow.add_edge("analyze_logs", "generate_solution")
workflow.add_edge("generate_solution", "assign_ticket")
workflow.add_edge("assign_ticket", "notify_engineer")
workflow.add_edge("notify_engineer", END)
# 编译为可执行图
app = workflow.compile()
对于复杂场景,需要条件分支和重试机制。
def should_retry_analysis(state: TicketState) -> str:
"""判断是否需要重新分析"""
confidence = state["rca_report"].get("confidence", 0)
if confidence < 0.6:
return "retry"
else:
return "proceed"
# 添加条件边
workflow.add_conditional_edges(
"analyze_logs",
should_retry_analysis,
{
"retry": "analyze_logs", # 循环回分析节点
"proceed": "generate_solution"
}
)
关键决策点引入人工审核,确保安全性。
from langgraph.checkpoint.memory import MemorySaver
# 启用检查点保存,支持中断和恢复
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# 在关键节点前设置中断
config = {"interrupt_before": ["assign_ticket"]}
# 运行到中断点
result = app.invoke(input_data, config=config)
# 等待人工审批后继续
approval = wait_for_human_approval()
if approval:
app.invoke(None, config=config)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
import json
app = FastAPI()
# 连接管理器
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
self.active_connections[user_id] = websocket
def disconnect(self, user_id: str):
if user_id in self.active_connections:
del self.active_connections[user_id]
async def send_personal_message(self, message: dict, user_id: str):
if user_id in self.active_connections:
websocket = self.active_connections[user_id]
await websocket.send_text(json.dumps(message))
async def broadcast_progress(self, ticket_id: str, progress: dict):
"""广播处理进度给所有订阅者"""
message = {
"type": "progress_update",
"ticket_id": ticket_id,
"data": progress
}
for connection in self.active_connections.values():
await connection.send_text(json.dumps(message))
manager = ConnectionManager()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_text()
# 处理客户端消息
message = json.loads(data)
await handle_client_message(user_id, message)
except WebSocketDisconnect:
manager.disconnect(user_id)
async def stream_llm_response(ticket_id: str, user_id: str):
"""流式输出 LLM 分析过程"""
# 发送开始消息
await manager.send_personal_message({
"type": "analysis_start",
"ticket_id": ticket_id
}, user_id)
# 模拟流式输出
chunks = [
{"step": "正在检索日志...", "progress": 20},
{"step": "发现 3 个异常模式", "progress": 45},
{"step": "正在分析根因...", "progress": 70},
{"step": "生成 RCA 报告...", "progress": 90},
{"step": "分析完成!", "progress": 100}
]
for chunk in chunks:
await manager.send_personal_message({
"type": "analysis_progress",
"ticket_id": ticket_id,
"chunk": chunk
}, user_id)
await asyncio.sleep(0.5) # 模拟处理延迟
# 发送最终结果
await manager.send_personal_message({
"type": "analysis_complete",
"ticket_id": ticket_id,
"result": rca_report
}, user_id)
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Application │───▶│ Fluentd │───▶│Elasticsearch│
│ Services │ │ Agent │ │ Cluster │
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐
│ Kafka │
│ Buffer │
└─────────────┘
from sklearn.ensemble import IsolationForest
import numpy as np
class LogAnomalyDetector:
def __init__(self):
self.model = IsolationForest(contamination=0.01)
def extract_features(self, logs: List[dict]) -> np.ndarray:
"""从日志中提取特征向量"""
features = []
for log in logs:
feature_vector = [
log.get("response_time", 0),
log.get("error_count", 0),
log.get("cpu_usage", 0),
log.get("memory_usage", 0),
len(log.get("stack_trace", []))
]
features.append(feature_vector)
return np.array(features)
def detect_anomalies(self, logs: List[dict]) -> List[int]:
"""检测异常日志"""
X = self.extract_features(logs)
predictions = self.model.fit_predict(X)
# -1 表示异常,1 表示正常
anomaly_indices = [i for i, pred in enumerate(predictions) if pred == -1]
return anomaly_indices
async def llm_root_cause_analysis(anomaly_logs: List[dict], context: dict) -> dict:
"""使用 LLM 进行根因推理"""
prompt = f"""
你是一位资深 SRE 专家。请分析以下异常日志并找出根因:
【异常日志】
{json.dumps(anomaly_logs, indent=2)}
【系统上下文】
服务:{context['service']}
时间:{context['time_window']}
影响:{context['impact']}
【历史相似案例】
{context['similar_cases']}
请回答:
1. 最可能的 3 个根因(按可能性排序)
2. 每个根因的证据链
3. 推荐的修复方案
4. 预防措施建议
以 JSON 格式返回。
"""
response = await llm_client.generate(prompt)
return parse_json_response(response)
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Enum
from sqlalchemy.ext.declarative import declarative_base
import enum
Base = declarative_base()
class Priority(enum.Enum):
P0 = "P0" # 紧急
P1 = "P1" # 高
P2 = "P2" # 中
P3 = "P3" # 低
class TicketStatus(enum.Enum):
RECEIVED = "RECEIVED"
CLASSIFIED = "CLASSIFIED"
ANALYZING = "ANALYZING"
ASSIGNED = "ASSIGNED"
IN_PROGRESS = "IN_PROGRESS"
PENDING_VERIFICATION = "PENDING_VERIFICATION"
RESOLVED = "RESOLVED"
CLOSED = "CLOSED"
class Ticket(Base):
__tablename__ = "tickets"
id = Column(Integer, primary_key=True, autoincrement=True)
ticket_number = Column(String(50), unique=True, nullable=False)
region = Column(String(10), nullable=False) # CN / GLOBAL
type = Column(String(20), nullable=False) # BUG / FEATURE / SUGGESTION
priority = Column(Enum(Priority), default=Priority.P2)
status = Column(Enum(TicketStatus), default=TicketStatus.RECEIVED)
title = Column(String(500), nullable=False)
description = Column(String(10000))
reporter_id = Column(String(100), ForeignKey("users.id"))
assignee_id = Column(String(100), ForeignKey("users.id"))
rca_report = Column(JSON)
solution = Column(JSON)
created_at = Column(DateTime, nullable=False)
updated_at = Column(DateTime, onupdate=datetime.now)
resolved_at = Column(DateTime)
class TicketDispatcher:
def __init__(self):
self.load_balancer = LoadBalancer()
self.skill_matcher = SkillMatcher()
async def auto_assign(self, ticket: Ticket) -> User:
"""自动分派工单"""
# 1. 确定候选团队
team = self.get_team_by_type(ticket.type, ticket.tags)
# 2. 获取团队可用成员
candidates = await self.get_available_members(team)
# 3. 计算每个候选人的综合得分
scored_candidates = []
for candidate in candidates:
score = await self.calculate_assignment_score(ticket, candidate)
scored_candidates.append((candidate, score))
# 4. 选择得分最高的候选人
best_candidate = max(scored_candidates, key=lambda x: x[1])[0]
# 5. 分配工单
ticket.assignee_id = best_candidate.id
ticket.status = TicketStatus.ASSIGNED
return best_candidate
async def calculate_assignment_score(self, ticket: Ticket, user: User) -> float:
"""计算分配得分"""
weights = {
"skill_match": 0.3,
"current_load": 0.25,
"response_time": 0.2,
"timezone_fit": 0.15,
"expertise_level": 0.1
}
skill_score = self.skill_matcher.match(ticket.tags, user.skills)
load_score = 1.0 - (user.open_tickets / 10.0) # 归一化
response_score = 1.0 / (user.avg_response_hours + 1)
timezone_score = self.calculate_timezone_fit(ticket.region, user.timezone)
expertise_score = user.expertise_level / 5.0
total_score = (
weights["skill_match"] * skill_score +
weights["current_load"] * load_score +
weights["response_time"] * response_score +
weights["timezone_fit"] * timezone_score +
weights["expertise_level"] * expertise_score
)
return total_score
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import JsonOutputParser
class BugClassificationAgent:
"""Bug 分类 Agent"""
def __init__(self):
self.llm = ChatOpenAI(model="gpt-4-turbo-preview", temperature=0)
self.parser = JsonOutputParser()
self.prompt = ChatPromptTemplate.from_messages([
("system", """你是一位经验丰富的技术支持专家,擅长分析和分类 Bug 报告。
请根据以下标准进行分类:
- 类型:frontend/backend/database/network/other
- 优先级:P0(紧急)/P1(高)/P2(中)/P3(低)
- 标签:最多 5 个相关标签
- 建议处理团队:team_name
始终以 JSON 格式返回。"""),
("human", """标题:{title}
描述:{description}
环境:{environment}
堆栈跟踪:{stack_trace}
{format_instructions}""")
])
self.chain = self.prompt | self.llm | self.parser
async def classify(self, ticket_data: dict) -> dict:
"""执行分类"""
result = await self.chain.ainvoke({
"title": ticket_data["title"],
"description": ticket_data["description"],
"environment": ticket_data.get("environment", "unknown"),
"stack_trace": ticket_data.get("stack_trace", "none"),
"format_instructions": self.parser.get_format_instructions()
})
return result
from elasticsearch import AsyncElasticsearch
class LogSearchService:
def __init__(self):
self.es = AsyncElasticsearch(hosts=["http://es-cluster:9200"])
async def search_related_logs(
self,
service: str,
time_window: str,
keywords: List[str],
size: int = 100
) -> List[dict]:
"""搜索相关日志"""
query = {
"query": {
"bool": {
"must": [
{"term": {"service.keyword": service}},
{"range": {"@timestamp": {"gte": f"now-{time_window}"}}},
{
"multi_match": {
"query": " ".join(keywords),
"fields": ["message", "error_type", "stack_trace"]
}
}
]
}
},
"sort": [{"@timestamp": "desc"}],
"size": size
}
response = await self.es.search(index="logs-*", body=query)
return [hit["_source"] for hit in response["hits"]["hits"]]
async def aggregate_errors(self, service: str, time_window: str) -> dict:
"""聚合错误统计"""
query = {
"size": 0,
"query": {
"bool": {
"must": [
{"term": {"service.keyword": service}},
{"range": {"@timestamp": {"gte": f"now-{time_window}"}}},
{"exists": {"field": "error_type"}}
]
}
},
"aggs": {
"error_types": {
"terms": {
"field": "error_type.keyword",
"size": 10
}
},
"timeline": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "hour"
}
}
}
}
response = await self.es.search(index="logs-*", body=query)
return response["aggregations"]
version: '3.8'
services:
fastapi-app:
build: ./app
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/tickets
- REDIS_URL=redis://redis:6379
- ELASTICSEARCH_URL=http://elasticsearch:9200
- OPENAI_API_KEY=${OPENAI_API_KEY}
depends_on:
- postgres
- redis
- elasticsearch
deploy:
replicas: 3
postgres:
image: postgres:15-alpine
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
- POSTGRES_DB=tickets
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
volumes:
- es_data:/usr/share/elasticsearch/data
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
ports:
- "5601:5601"
depends_on:
- elasticsearch
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
volumes:
postgres_data:
redis_data:
es_data:
grafana_data:
| 指标类别 | 具体指标 | 告警阈值 |
|---|---|---|
| 系统资源 | CPU 使用率 | > 80% 持续 5 分钟 |
| 内存使用率 | > 85% 持续 5 分钟 | |
| 磁盘使用率 | > 90% | |
| 应用性能 | API P95 延迟 | > 500ms |
| 错误率 | > 1% | |
| WebSocket 连接数 | > 8000 | |
| 业务指标 | 工单积压量 | > 100 |
| 平均处理时长 | > 4 小时 | |
| RCA 成功率 | < 70% |
| 区域 | 合规要求 | 实施措施 |
|---|---|---|
| 中国大陆 | 网络安全法、数据安全法、个人信息保护法 | 数据本地化存储、出境安全评估、用户同意机制 |
| 欧盟 | GDPR | 数据主体权利、隐私设计、DPO 任命 |
| 美国 | CCPA、HIPAA(如适用) | 消费者隐私权、医疗数据特殊保护 |
本方案设计了一套完整的企业级 AI Agent 系统,实现了从 Bug 反馈到修复闭环的全流程自动化。通过 FastAPI + LangGraph + React + WebSocket 的技术组合,构建了高可用、可扩展、智能化的运维支撑平台。
建议采用分阶段实施策略: