基于 FastAPI + LangGraph + React + WebSocket 的智能化 Bug 反馈与建议处理系统
本报告针对基于 FastAPI + LangGraph + React + WebSocket 技术栈的企业级 AI Agent 系统,设计一套完整的系统级 Agent 架构,实现智能化的 Bug 反馈处理、建议收集与分析、以及全流程闭环管理。
| 功能模块 | 核心能力 | 技术要点 |
|---|---|---|
| Bug 反馈 | 自动分类(国内/国外)、优先级评估、路由分发 | NLP 分类、地理位置识别、规则引擎 |
| 建议处理 | 基于 Log 自动分析、趋势识别、智能聚合 | Log 解析、异常检测、聚类分析 |
| 工作流引擎 | 反馈→方案→下发→反馈→修复 全流程 | LangGraph 状态机、持久化、人工介入点 |
| 通知系统 | 多通道通知(WebSocket/邮件/钉钉) | WebSocket 流式推送、异步任务队列 |
| 数据分析 | 处理时效、修复率、趋势报表 | 时序数据库、聚合查询、可视化 |
┌─────────────────────────────────────────────────────────────────────────┐
│ 前端层 (React) │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Bug 反馈门户 │ │ 建议提交门户 │ │ 处理进度看板 │ │ 数据分析大屏 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼ WebSocket (流式通信)
┌─────────────────────────────────────────────────────────────────────────┐
│ API 网关层 (FastAPI) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ REST API Endpoints │ WebSocket Handler │ Auth Middleware │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 系统级 Agent 编排层 (LangGraph) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Multi-Agent Collaboration │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ 分类 Agent│ │ 分析 Agent│ │ 路由 Agent│ │ 跟进 Agent│ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │ │ │ │ │
│ │ └────────────┴────────────┴────────────┘ │ │
│ │ │ │ │
│ │ ┌───────▼───────┐ │ │
│ │ │ 工作流引擎 │ │ │
│ │ │ (StateGraph) │ │ │
│ │ └───────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 服务层 (Microservices) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Log 分析 │ │ 通知服务 │ │ 用户管理 │ │ 报表服务 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 数据持久层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │PostgreSQL│ │ Redis │ │ Timescale│ │ MinIO │ │
│ │(主数据库) │ │ (缓存/队列)│ │DB(时序) │ │(文件存储) │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
基于 LangGraph 的 Multi-Agent Collaboration 模式,设计以下专业 Agent:
| Agent 名称 | 职责 | 输入 | 输出 | 工具依赖 |
|---|---|---|---|---|
| ClassifierAgent | Bug/建议分类、地域识别 | 原始反馈文本 | 分类标签、地域标记 | NLP 模型、IP 地理位置库 |
| AnalyzerAgent | Log 关联分析、根因推测 | 分类结果、相关 Log | 分析报告、可疑模块 | Log 解析器、异常检测算法 |
| RouterAgent | 任务路由、人员匹配 | 分析结果、团队负载 | 指派方案、优先级 | 用户数据库、排班系统 |
| CoordinatorAgent | 流程协调、进度跟踪 | 任务状态、人员反馈 | 进度更新、升级决策 | 通知服务、SLA 规则引擎 |
| ReporterAgent | 报告生成、数据聚合 | 历史数据、当前状态 | 可视化报表、趋势分析 | 时序数据库、图表库 |
from typing import TypedDict, List, Dict, Any, Literal
from datetime import datetime
from typing_extensions import Annotated
from operator import add
class SystemAgentState(TypedDict):
"""系统 Agent 完整状态定义"""
# 输入数据
original_input: str # 原始反馈内容
source_channel: str # 来源渠道 (web/app/api)
user_info: Dict[str, Any] # 用户信息
timestamp: datetime # 时间戳
# 分类结果
issue_type: Literal["bug", "suggestion", "question"]
region: Literal["domestic", "international"]
severity: Literal["critical", "high", "medium", "low"]
category: str # 具体分类标签
# 分析结果
log_analysis: Dict[str, Any] # Log 分析结果
root_cause_hypothesis: str # 根因假设
affected_modules: Annotated[List[str], add]
# 路由决策
assigned_team: str # 指派团队
assigned_person: str # 指派人员
priority_score: int # 优先级分数
sla_deadline: datetime # SLA 截止时间
# 工作流状态
workflow_stage: Literal[
"received", "classified", "analyzed",
"routed", "plan_review", "assigned",
"in_progress", "feedback_pending",
"resolved", "verification", "closed", "rejected"
]
conversation_history: List[Dict[str, Any]]
human_feedback: Dict[str, Any]
resolution_plan: Dict[str, Any]
# 元数据
checkpoints: List[Dict[str, Any]]
notifications_sent: List[Dict[str, Any]]
task_id: str
from langgraph.graph import StateGraph, END
from langchain_core.tools import tool
# ============== 工具定义 ==============
@tool
def classify_issue(text: str, user_location: str = None) -> dict:
"""分类问题类型并识别地域"""
# 实现:LLM + 规则引擎
pass
@tool
def analyze_logs(time_range: dict, modules: list[str]) -> dict:
"""分析指定时间段和模块的 Log"""
# 实现:查询 TimescaleDB + 异常检测
pass
@tool
def send_notification(user_id: str, channel: str, message: dict) -> bool:
"""发送通知(WebSocket/邮件/钉钉)"""
# 实现:异步通知服务
pass
# ============== Agent 节点 ==============
def classifier_node(state: SystemAgentState) -> dict:
"""分类 Agent 节点"""
# 调用 classify_issue 工具
# 更新 state 中的 issue_type, region, severity, category
pass
def analyzer_node(state: SystemAgentState) -> dict:
"""分析 Agent 节点"""
# 调用 analyze_logs 工具
# 生成 root_cause_hypothesis 和 affected_modules
pass
def router_node(state: SystemAgentState) -> dict:
"""路由 Agent 节点"""
# 调用 get_team_availability 工具
# 决定 assigned_team, assigned_person, priority_score
pass
# ============== 构建 StateGraph ==============
def build_system_agent_graph():
workflow = StateGraph(SystemAgentState)
# 添加节点
workflow.add_node("classifier", classifier_node)
workflow.add_node("analyzer", analyzer_node)
workflow.add_node("router", router_node)
workflow.add_node("coordinator", coordinator_node)
# 定义边
workflow.set_entry_point("classifier")
workflow.add_edge("classifier", "analyzer")
workflow.add_edge("analyzer", "router")
return workflow.compile(
checkpointer=PostgresSaver.from_conn_string("postgresql://..."),
interrupt_before=["human_review"]
)
class RegionClassifier:
"""地域分类器 - 多维度识别策略"""
def classify(self, feedback_data: dict) -> Literal["domestic", "international"]:
scores = {"domestic": 0.0, "international": 0.0}
# 1. IP 地理位置识别(最高优先级)
if feedback_data.get("ip_address"):
region = self._geo_ip_lookup(feedback_data["ip_address"])
if region == "CN":
scores["domestic"] += 0.5
else:
scores["international"] += 0.5
# 2. 用户注册地
if feedback_data.get("user_profile", {}).get("region"):
if feedback_data["user_profile"]["region"] == "CN":
scores["domestic"] += 0.3
else:
scores["international"] += 0.3
# 3. 手机号段识别
if feedback_data.get("phone"):
if feedback_data["phone"].startswith("+86"):
scores["domestic"] += 0.15
else:
scores["international"] += 0.15
return "domestic" if scores["domestic"] > scores["international"] else "international"
class PriorityEvaluator:
"""Bug 优先级评估模型"""
def calculate_priority(self, severity: str, region: str,
affected_users: int, business_impact: str,
is_regression: bool) -> tuple[int, str]:
base_scores = {"critical": 80, "high": 60, "medium": 40, "low": 20}
score = base_scores.get(severity, 40)
# 地域加权(国内核心业务加权)
if region == "domestic" and business_impact == "core":
score += 10
# 影响用户数加权
if affected_users > 10000:
score += 15
elif affected_users > 1000:
score += 10
# 业务影响加权
impact_weights = {"core": 15, "important": 10, "normal": 5, "minor": 0}
score += impact_weights.get(business_impact, 5)
# 回归 Bug 加权
if is_regression:
score += 10
# 转换为优先级等级
if score >= 90:
level = "P0"
elif score >= 70:
level = "P1"
elif score >= 50:
level = "P2"
elif score >= 30:
level = "P3"
else:
level = "P4"
return score, level
┌─────────────────────────────────────────────────────────────────┐
│ 建议收集入口 │
│ (用户提交 / 系统自动检测 / 客服转交) │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Log 关联分析引擎 (Log Analysis Engine) │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 1. Log 采集层 │ │
│ │ - 应用 Log (FastAPI/React) │ │
│ │ - 系统 Log (OS/Container) │ │
│ │ - 业务 Log (用户行为/交易) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 2. Log 解析与标准化 │ │
│ │ - 结构化解析 (JSON/Grok) │ │
│ │ - 字段标准化 (timestamp, level, module, message) │ │
│ │ - 上下文关联 (trace_id, session_id, user_id) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 3. 异常检测与模式识别 │ │
│ │ - 统计异常 (3-sigma, IQR) │ │
│ │ - 时序异常 (Prophet, LSTM) │ │
│ │ - 模式聚类 (LogCluster, Drain) │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────────┐ │
│ │ 4. 根因推测与建议生成 │ │
│ │ - 关联规则挖掘 (Apriori) │ │
│ │ - LLM 根因分析 │ │
│ │ - 智能建议生成 │ │
│ └───────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
class LogAnalysisEngine:
"""Log 分析引擎 - 支持多维度异常检测和建议生成"""
def analyze_for_suggestion(self, suggestion_text: str,
time_range: Dict[str, datetime],
modules: List[str] = None) -> Dict[str, Any]:
# 1. 从建议文本提取关键词和模块
keywords = self._extract_keywords(suggestion_text)
if not modules:
modules = self._infer_modules_from_keywords(keywords)
# 2. 查询相关 Log
related_logs = self._query_related_logs(
keywords=keywords,
modules=modules,
time_range=time_range
)
# 3. 异常检测
anomalies = self._detect_anomalies(related_logs)
# 4. 模式聚类
patterns = self._cluster_log_patterns(related_logs)
# 5. LLM 根因分析
root_cause = self.llm_analyzer.analyze(
logs=related_logs,
anomalies=anomalies,
patterns=patterns,
suggestion=suggestion_text
)
return {
"related_logs_count": len(related_logs),
"sample_logs": related_logs[:10],
"anomaly_patterns": anomalies,
"log_patterns": patterns,
"root_cause_hypothesis": root_cause,
"recommendation": self._generate_recommendation(root_cause, anomalies),
"confidence_score": self._calculate_confidence(anomalies, patterns, root_cause)
}
def _detect_anomalies(self, logs: List[Dict]) -> List[Dict]:
"""多维度异常检测"""
anomalies = []
# 1. 错误率异常 (3-sigma 原则)
error_rates = self._calculate_error_rate_by_time(logs, interval="1h")
mean_rate = np.mean(error_rates)
std_rate = np.std(error_rates)
for timestamp, rate in error_rates.items():
if rate > mean_rate + 3 * std_rate:
anomalies.append({
"type": "error_rate_spike",
"timestamp": timestamp,
"value": rate,
"severity": "high" if rate > mean_rate + 5 * std_rate else "medium"
})
# 2. 响应时间异常 (IQR 方法)
response_times = [log.get("response_time", 0) for log in logs]
if response_times:
q1, q3 = np.percentile(response_times, [25, 75])
iqr = q3 - q1
upper_bound = q3 + 1.5 * iqr
slow_logs = [log for log in logs if log.get("response_time", 0) > upper_bound]
if slow_logs:
anomalies.append({
"type": "slow_response",
"count": len(slow_logs),
"avg_response_time": np.mean([l["response_time"] for l in slow_logs]),
"threshold": upper_bound
})
return anomalies
┌──────────┐
│ RECEIVED │ ← 接收反馈
└─────┬────┘
│
▼
┌──────────┐
│CLASSIFIED│ ← 分类 (Bug/建议) + 地域识别
└─────┬────┘
│
▼
┌──────────┐
│ ANALYZED │ ← Log 关联分析
└─────┬────┘
│
▼
┌──────────┐
│ ROUTED │ ← 路由到团队/人员
└─────┬────┘
│
▼
┌──────────┐
│PLAN_REVIEW│ ← 制定处理方案
└─────┬────┘
│
▼
┌──────────────┐ ◄─── 🔴 人工介入点 1: 方案审核
│ HUMAN_REVIEW │ (approve/reject/modify)
└──────┬───────┘
│
┌─────┴─────┐
│ │
▼ ▼
┌────────┐ ┌────────┐
│ASSIGNED│ │REJECTED│ → END
└───┬────┘ └────────┘
│
▼
┌──────────────┐ ◄─── 🔴 人工介入点 2: 人员反馈
│ FEEDBACK │ (进展汇报)
└──────┬───────┘
│
▼
┌──────────┐
│ TRACK │ ← SLA 监控 + 升级处理
└─────┬────┘
│
│ [完成?]
├─────────────┐
│ │
▼ │ (未完成)
┌──────────┐ │
│ RESOLVED │ │
└─────┬────┘ │
│ │
▼ │
┌──────────┐ │
│ VERIFY │────────┘ (验证失败)
└─────┬────┘
│
│ [通过?]
├─────────────┐
│ │
▼ │ (失败)
┌──────────┐ │
│ CLOSED │ │
└─────┬────┘ │
│ │
▼ │
END ◄───────────┘
| 工作流阶段 | 最大时长 | 超时升级 | 通知渠道 |
|---|---|---|---|
RECEIVED |
1 小时 | ✅ | WebSocket, 邮件 |
CLASSIFIED |
2 小时 | ✅ | WebSocket, 邮件 |
ANALYZED |
4 小时 | ✅ | WebSocket |
PLAN_REVIEW |
24 小时 | ✅ | 邮件,钉钉/Slack |
ASSIGNED |
72 小时 | ✅ | 邮件,钉钉/Slack |
IN_PROGRESS |
168 小时 | ✅ | 邮件,钉钉/Slack |
RESOLVED |
48 小时 | ✅ | WebSocket, 邮件 |
from langgraph.types import interrupt
def human_plan_review(state: SystemAgentState) -> dict:
"""人工审核方案(中断点)"""
# 中断等待人工审核
feedback = interrupt({
"message": "请审核处理方案",
"plan": state["resolution_plan"],
"options": ["approve", "reject", "modify"]
})
if feedback["action"] == "approve":
return {"workflow_stage": "assigned", "human_feedback": feedback}
elif feedback["action"] == "modify":
return {
"resolution_plan": feedback["modified_plan"],
"workflow_stage": "assigned",
"human_feedback": feedback
}
else: # reject
return {"workflow_stage": "rejected", "human_feedback": feedback}
主数据库
时序数据库
缓存/会话
向量数据库
-- 任务主表
CREATE TABLE tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_number VARCHAR(20) UNIQUE NOT NULL,
-- 分类信息
issue_type VARCHAR(20) NOT NULL CHECK (issue_type IN ('bug', 'suggestion', 'question')),
region VARCHAR(20) NOT NULL CHECK (region IN ('domestic', 'international')),
severity VARCHAR(20) NOT NULL,
category VARCHAR(100) NOT NULL,
priority_score INTEGER DEFAULT 0,
priority_level VARCHAR(5) DEFAULT 'P3',
-- 内容
title VARCHAR(500) NOT NULL,
description TEXT NOT NULL,
-- 分析结果
log_analysis JSONB,
root_cause_hypothesis TEXT,
affected_modules TEXT[],
-- 路由信息
assigned_team_id UUID REFERENCES teams(id),
assigned_person_id UUID REFERENCES users(id),
-- 工作流状态
workflow_stage VARCHAR(50) NOT NULL DEFAULT 'received',
resolution_plan JSONB,
person_feedback JSONB,
-- 时间信息
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
sla_deadline TIMESTAMPTZ,
resolved_at TIMESTAMPTZ,
closed_at TIMESTAMPTZ
);
-- 索引优化
CREATE INDEX idx_tasks_workflow_stage ON tasks(workflow_stage);
CREATE INDEX idx_tasks_assigned_person ON tasks(assigned_person_id);
CREATE INDEX idx_tasks_sla_deadline ON tasks(sla_deadline)
WHERE workflow_stage NOT IN ('closed', 'rejected');
CREATE INDEX idx_tasks_metadata ON tasks USING GIN (metadata);
-- TimescaleDB Log 表
CREATE TABLE application_logs (
time TIMESTAMPTZ NOT NULL,
log_id UUID DEFAULT gen_random_uuid(),
level VARCHAR(10) NOT NULL,
module VARCHAR(100),
trace_id UUID,
user_id UUID,
message TEXT NOT NULL,
context JSONB,
response_time_ms INTEGER
);
SELECT create_hypertable('application_logs', 'time');
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.redis import RedisStore
import redis
# PostgreSQL Checkpointer (持久化工作流状态)
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:password@localhost:5432/langgraph_db",
serde_config={"type": "json", "fallback_to_pickle": False}
)
# Redis Store (跨线程持久化记忆)
redis_client = redis.Redis(host="localhost", port=6333, db=0)
store = RedisStore(
redis_client=redis_client,
namespace="system_agent",
ttl_seconds=86400 * 30 # 30 天 TTL
)
# 编译时配置
graph = build_system_agent_graph().compile(
checkpointer=checkpointer,
store=store,
interrupt_before=["human_review", "feedback"],
durability="sync"
)
┌─────────────────┐ ┌─────────────────┐
│ React 前端 │ │ FastAPI 后端 │
│ │ │ │
│ ┌───────────┐ │ WebSocket 连接 │ ┌───────────┐ │
│ │ WebSocket │◄─┼──────────────────────────┼─►│ WebSocket │ │
│ │ Hook │ │ (双向流式通信) │ │ Handler │ │
│ └───────────┘ │ │ └───────────┘ │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ 消息队列 │ │ │ │ 连接管理 │ │
│ │ (状态管理) │ │ │ │ (Redis) │ │
│ └───────────┘ │ │ └───────────┘ │
│ │ │ │ │ │
│ ▼ │ │ ▼ │
│ ┌───────────┐ │ │ ┌───────────┐ │
│ │ UI │ │ │ │ LangGraph │ │
│ │ 渲染 │ │ │ │ Stream │ │
│ └───────────┘ │ │ └───────────┘ │
└─────────────────┘ └─────────────────┘
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List
import redis.asyncio as redis
app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6333, db=1)
class ConnectionManager:
"""WebSocket 连接管理器"""
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
self.task_subscriptions: Dict[str, set] = {}
async def connect(self, websocket: WebSocket, user_id: str):
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = []
self.active_connections[user_id].append(websocket)
await redis_client.sadd("online_users", user_id)
async def send_personal_message(self, message: dict, user_id: str):
if user_id in self.active_connections:
for connection in self.active_connections[user_id]:
await connection.send_json(message)
async def broadcast_to_task_subscribers(self, task_id: str, message: dict):
if task_id in self.task_subscriptions:
for user_id in self.task_subscriptions[task_id]:
await self.send_personal_message(message, user_id)
manager = ConnectionManager()
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(websocket, user_id)
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
await handle_websocket_message(websocket, user_id, message)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
async def stream_graph_execution(websocket: WebSocket, graph, input_data: dict):
"""流式执行 LangGraph 并推送进度"""
async for event in graph.astream(
input=input_data,
config={"stream_mode": ["values", "updates", "debug"]}
):
await websocket.send_json({
"type": "graph_event",
"payload": event,
"timestamp": datetime.utcnow().isoformat()
})
// hooks/useWebSocket.ts
import { useEffect, useRef, useCallback, useState } from 'react';
export function useWebSocket({ userId, onMessage, onReconnect }) {
const wsRef = useRef(null);
const [isConnected, setIsConnected] = useState(false);
const connect = useCallback(() => {
const ws = new WebSocket(`ws://localhost:8000/ws/${userId}`);
ws.onopen = () => setIsConnected(true);
ws.onmessage = (event) => onMessage?.(JSON.parse(event.data));
ws.onclose = () => {
setIsConnected(false);
setTimeout(() => { onReconnect?.(); connect(); }, 3000);
};
wsRef.current = ws;
}, [userId, onMessage, onReconnect]);
useEffect(() => {
connect();
return () => wsRef.current?.close();
}, [connect]);
const sendMessage = useCallback((message) => {
if (wsRef.current?.readyState === WebSocket.OPEN) {
wsRef.current.send(JSON.stringify(message));
}
}, []);
const subscribeToTask = useCallback((taskId) => {
sendMessage({ type: 'subscribe_task', task_id: taskId });
}, [sendMessage]);
return {
isConnected,
sendMessage,
subscribeToTask,
submitFeedback: (data) => sendMessage({ type: 'submit_feedback', data }),
provideFeedback: (taskId, feedback) =>
sendMessage({ type: 'provide_feedback', task_id: taskId, feedback })
};
}
system-agent/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI 应用入口
│ │ ├── config/
│ │ │ ├── settings.py # 配置管理
│ │ │ └── langgraph_config.py # LangGraph 配置
│ │ ├── agents/
│ │ │ ├── classifier.py # 分类 Agent
│ │ │ ├── analyzer.py # 分析 Agent
│ │ │ ├── router.py # 路由 Agent
│ │ │ └── coordinator.py # 协调 Agent
│ │ ├── graph/
│ │ │ ├── state.py # StateGraph 状态定义
│ │ │ ├── nodes.py # 节点实现
│ │ │ └── workflow.py # 完整工作流构建
│ │ ├── tools/
│ │ │ ├── log_analysis.py # Log 分析工具
│ │ │ ├── notification.py # 通知工具
│ │ │ └── task_management.py # 任务管理工具
│ │ ├── api/
│ │ │ ├── routes/
│ │ │ │ ├── feedback.py # 反馈 API
│ │ │ │ ├── tasks.py # 任务 API
│ │ │ │ └── analytics.py # 分析 API
│ │ │ └── websocket/
│ │ │ ├── handler.py # WebSocket 处理器
│ │ │ └── manager.py # 连接管理器
│ │ ├── models/ # 数据模型
│ │ ├── services/ # 业务服务
│ │ └── utils/ # 工具函数
│ ├── tests/
│ ├── requirements.txt
│ └── Dockerfile
├── frontend/
│ ├── src/
│ │ ├── components/
│ │ ├── hooks/
│ │ ├── pages/
│ │ └── utils/
│ └── package.json
├── infrastructure/
│ ├── docker-compose.yml
│ └── docker-compose.prod.yml
├── langgraph.json
└── README.md
# langgraph.json
{
"dependencies": ["."],
"graphs": {
"system_agent": "./backend/app/graph/workflow.py:build_system_agent_graph"
},
"env": ".env",
"store": {
"type": "redis",
"config": {"host": "localhost", "port": 6333}
},
"checkpointer": {
"type": "postgres",
"config": {"connection_string": "postgresql://user:pass@localhost:5432/langgraph_db"}
}
}
# 1. 克隆项目
git clone <repository-url>
cd system-agent
# 2. 配置环境变量
cp .env.example .env
# 3. 启动所有服务
cd infrastructure
docker-compose up -d
# 4. 初始化数据库
docker-compose exec postgres psql -U system_agent -f /docker-entrypoint-initdb.d/01-init.sql
# 5. 访问服务
# - FastAPI API: http://localhost:8000
# - API 文档:http://localhost:8000/docs
# - React 前端:http://localhost:3000
# - LangGraph Studio: http://localhost:8123
# infrastructure/docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:16
environment:
POSTGRES_USER: system_agent
POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
POSTGRES_DB: system_agent_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
timescaledb:
image: timescale/timescaledb:latest-pg16
environment:
POSTGRES_USER: system_agent
POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
POSTGRES_DB: logs_db
ports:
- "5433:5432"
redis:
image: redis:7-alpine
command: redis-server --appendonly yes
volumes:
- redis_data:/data
ports:
- "6379:6379"
qdrant:
image: qdrant/qdrant:latest
volumes:
- qdrant_data:/qdrant/storage
ports:
- "6333:6333"
backend:
build:
context: ../backend
dockerfile: Dockerfile
environment:
DATABASE_URL: postgresql://system_agent:${DB_PASSWORD}@postgres:5432/system_agent_db
REDIS_URL: redis://redis:6379
depends_on:
- postgres
- redis
ports:
- "8000:8000"
frontend:
build:
context: ../frontend
dockerfile: Dockerfile
environment:
REACT_APP_API_URL: http://localhost:8000
REACT_APP_WS_URL: ws://localhost:8000
depends_on:
- backend
ports:
- "3000:3000"
volumes:
postgres_data:
redis_data:
qdrant_data:
# 查看服务状态
docker-compose ps
# 查看日志
docker-compose logs -f backend
# 重启服务
docker-compose restart backend
# 备份数据库
docker-compose exec postgres \
pg_dump -U system_agent system_agent_db > backup.sql
# 查看任务统计
docker-compose exec postgres psql -U system_agent -c \
"SELECT workflow_stage, COUNT(*) FROM tasks GROUP BY workflow_stage;"
# 查看在线用户
docker-compose exec redis redis-cli SMEMBERS online_users
# 查看待处理通知
docker-compose exec redis redis-cli LLEN notification_queue
| 问题现象 | 可能原因 | 排查步骤 | 解决方案 |
|---|---|---|---|
| WebSocket 连接失败 | Redis 不可用 | docker-compose ps redis |
重启 Redis 服务 |
| 工作流卡住 | Checkpoint 损坏 | 查看 LangGraph 日志 | 从上一个检查点恢复 |
| SLA 监控不触发 | 后台任务异常 | 检查 async 任务状态 | 重启后台任务 |
| Log 分析慢 | TimescaleDB 索引缺失 | EXPLAIN ANALYZE |
添加合适索引 |