🤖 企业级 AI Agent 系统级 Agent 架构设计深度研究报告

基于 FastAPI + LangGraph + React + WebSocket 的智能化 Bug 反馈与建议处理系统

📅 报告日期:2026 年 3 月 9 日 👤 版本:v1.0.0 📊 密级:内部公开 ⏱️ 预计阅读:45 分钟

📋 执行摘要

本报告针对基于 FastAPI + LangGraph + React + WebSocket 技术栈的企业级 AI Agent 系统,设计一套完整的系统级 Agent 架构,实现智能化的 Bug 反馈处理、建议收集与分析、以及全流程闭环管理。

🏗️ 1. 系统架构概览

1.1 核心需求拆解

功能模块 核心能力 技术要点
Bug 反馈 自动分类(国内/国外)、优先级评估、路由分发 NLP 分类、地理位置识别、规则引擎
建议处理 基于 Log 自动分析、趋势识别、智能聚合 Log 解析、异常检测、聚类分析
工作流引擎 反馈→方案→下发→反馈→修复 全流程 LangGraph 状态机、持久化、人工介入点
通知系统 多通道通知(WebSocket/邮件/钉钉) WebSocket 流式推送、异步任务队列
数据分析 处理时效、修复率、趋势报表 时序数据库、聚合查询、可视化

1.2 系统架构全景图

┌─────────────────────────────────────────────────────────────────────────┐
│                           前端层 (React)                                 │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐    │
│  │ Bug 反馈门户 │  │ 建议提交门户 │  │ 处理进度看板 │  │ 数据分析大屏 │    │
│  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘    │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼ WebSocket (流式通信)
┌─────────────────────────────────────────────────────────────────────────┐
│                         API 网关层 (FastAPI)                              │
│  ┌─────────────────────────────────────────────────────────────────┐    │
│  │  REST API Endpoints  │  WebSocket Handler  │  Auth Middleware   │    │
│  └─────────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                      系统级 Agent 编排层 (LangGraph)                       │
│  ┌─────────────────────────────────────────────────────────────────┐    │
│  │                    Multi-Agent Collaboration                     │    │
│  │  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │    │
│  │  │ 分类 Agent│  │ 分析 Agent│  │ 路由 Agent│  │ 跟进 Agent│        │    │
│  │  └──────────┘  └──────────┘  └──────────┘  └──────────┘        │    │
│  │         │            │            │            │                │    │
│  │         └────────────┴────────────┴────────────┘                │    │
│  │                          │                                       │    │
│  │                  ┌───────▼───────┐                              │    │
│  │                  │  工作流引擎   │                              │    │
│  │                  │ (StateGraph)  │                              │    │
│  │                  └───────────────┘                              │    │
│  └─────────────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                          服务层 (Microservices)                          │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐               │
│  │ Log 分析  │  │ 通知服务  │  │ 用户管理  │  │ 报表服务  │               │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘               │
└─────────────────────────────────────────────────────────────────────────┘
                                    │
                                    ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                          数据持久层                                     │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐               │
│  │PostgreSQL│  │  Redis   │  │ Timescale│  │  MinIO   │               │
│  │(主数据库) │  │ (缓存/队列)│  │DB(时序)  │  │(文件存储) │               │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘               │
└─────────────────────────────────────────────────────────────────────────┘
                
💡 架构设计原则:
  • 模块化:各层职责清晰,可独立扩展和替换
  • 异步化:FastAPI 异步处理 + LangGraph 持久化执行
  • 可观测性:完整日志链路 + WebSocket 实时推送
  • 高可用:多实例部署 + 数据持久化 + 断点续跑

🤖 2. 多 Agent 协作设计(基于 LangGraph)

2.1 Agent 角色定义

基于 LangGraph 的 Multi-Agent Collaboration 模式,设计以下专业 Agent:

Agent 名称 职责 输入 输出 工具依赖
ClassifierAgent Bug/建议分类、地域识别 原始反馈文本 分类标签、地域标记 NLP 模型、IP 地理位置库
AnalyzerAgent Log 关联分析、根因推测 分类结果、相关 Log 分析报告、可疑模块 Log 解析器、异常检测算法
RouterAgent 任务路由、人员匹配 分析结果、团队负载 指派方案、优先级 用户数据库、排班系统
CoordinatorAgent 流程协调、进度跟踪 任务状态、人员反馈 进度更新、升级决策 通知服务、SLA 规则引擎
ReporterAgent 报告生成、数据聚合 历史数据、当前状态 可视化报表、趋势分析 时序数据库、图表库

2.2 LangGraph StateGraph 状态定义

from typing import TypedDict, List, Dict, Any, Literal
from datetime import datetime
from typing_extensions import Annotated
from operator import add

class SystemAgentState(TypedDict):
    """系统 Agent 完整状态定义"""
    
    # 输入数据
    original_input: str                    # 原始反馈内容
    source_channel: str                    # 来源渠道 (web/app/api)
    user_info: Dict[str, Any]              # 用户信息
    timestamp: datetime                    # 时间戳
    
    # 分类结果
    issue_type: Literal["bug", "suggestion", "question"]
    region: Literal["domestic", "international"]
    severity: Literal["critical", "high", "medium", "low"]
    category: str                          # 具体分类标签
    
    # 分析结果
    log_analysis: Dict[str, Any]           # Log 分析结果
    root_cause_hypothesis: str             # 根因假设
    affected_modules: Annotated[List[str], add]
    
    # 路由决策
    assigned_team: str                     # 指派团队
    assigned_person: str                   # 指派人员
    priority_score: int                    # 优先级分数
    sla_deadline: datetime                 # SLA 截止时间
    
    # 工作流状态
    workflow_stage: Literal[
        "received", "classified", "analyzed",
        "routed", "plan_review", "assigned",
        "in_progress", "feedback_pending",
        "resolved", "verification", "closed", "rejected"
    ]
    conversation_history: List[Dict[str, Any]]
    human_feedback: Dict[str, Any]
    resolution_plan: Dict[str, Any]
    
    # 元数据
    checkpoints: List[Dict[str, Any]]
    notifications_sent: List[Dict[str, Any]]
    task_id: str

2.3 Agent 节点实现框架

from langgraph.graph import StateGraph, END
from langchain_core.tools import tool

# ============== 工具定义 ==============

@tool
def classify_issue(text: str, user_location: str = None) -> dict:
    """分类问题类型并识别地域"""
    # 实现:LLM + 规则引擎
    pass

@tool
def analyze_logs(time_range: dict, modules: list[str]) -> dict:
    """分析指定时间段和模块的 Log"""
    # 实现:查询 TimescaleDB + 异常检测
    pass

@tool
def send_notification(user_id: str, channel: str, message: dict) -> bool:
    """发送通知(WebSocket/邮件/钉钉)"""
    # 实现:异步通知服务
    pass

# ============== Agent 节点 ==============

def classifier_node(state: SystemAgentState) -> dict:
    """分类 Agent 节点"""
    # 调用 classify_issue 工具
    # 更新 state 中的 issue_type, region, severity, category
    pass

def analyzer_node(state: SystemAgentState) -> dict:
    """分析 Agent 节点"""
    # 调用 analyze_logs 工具
    # 生成 root_cause_hypothesis 和 affected_modules
    pass

def router_node(state: SystemAgentState) -> dict:
    """路由 Agent 节点"""
    # 调用 get_team_availability 工具
    # 决定 assigned_team, assigned_person, priority_score
    pass

# ============== 构建 StateGraph ==============

def build_system_agent_graph():
    workflow = StateGraph(SystemAgentState)
    
    # 添加节点
    workflow.add_node("classifier", classifier_node)
    workflow.add_node("analyzer", analyzer_node)
    workflow.add_node("router", router_node)
    workflow.add_node("coordinator", coordinator_node)
    
    # 定义边
    workflow.set_entry_point("classifier")
    workflow.add_edge("classifier", "analyzer")
    workflow.add_edge("analyzer", "router")
    
    return workflow.compile(
        checkpointer=PostgresSaver.from_conn_string("postgresql://..."),
        interrupt_before=["human_review"]
    )

🐛 3. Bug 反馈处理流程(区分国内/国外)

3.1 地域识别策略

🌍 多维度识别策略: 优先级:IP 地理位置 > 用户注册地 > 手机号段 > 文本语言特征
class RegionClassifier:
    """地域分类器 - 多维度识别策略"""
    
    def classify(self, feedback_data: dict) -> Literal["domestic", "international"]:
        scores = {"domestic": 0.0, "international": 0.0}
        
        # 1. IP 地理位置识别(最高优先级)
        if feedback_data.get("ip_address"):
            region = self._geo_ip_lookup(feedback_data["ip_address"])
            if region == "CN":
                scores["domestic"] += 0.5
            else:
                scores["international"] += 0.5
        
        # 2. 用户注册地
        if feedback_data.get("user_profile", {}).get("region"):
            if feedback_data["user_profile"]["region"] == "CN":
                scores["domestic"] += 0.3
            else:
                scores["international"] += 0.3
        
        # 3. 手机号段识别
        if feedback_data.get("phone"):
            if feedback_data["phone"].startswith("+86"):
                scores["domestic"] += 0.15
            else:
                scores["international"] += 0.15
        
        return "domestic" if scores["domestic"] > scores["international"] else "international"

3.2 差异化处理流程

📥 Bug 反馈接收
🌍 地域识别
🇨🇳 国内 Bug 流程
🌏 国外 Bug 流程
国内流程:
• 国内开发团队
• 钉钉通知
• 北京时间 SLA
• 中文沟通
国外流程:
• 国际支持团队
• Slack/邮件通知
• UTC 时间 SLA
• 英文沟通
⚙️ 统一工作流引擎

3.3 优先级评估模型

class PriorityEvaluator:
    """Bug 优先级评估模型"""
    
    def calculate_priority(self, severity: str, region: str, 
                            affected_users: int, business_impact: str,
                            is_regression: bool) -> tuple[int, str]:
        base_scores = {"critical": 80, "high": 60, "medium": 40, "low": 20}
        score = base_scores.get(severity, 40)
        
        # 地域加权(国内核心业务加权)
        if region == "domestic" and business_impact == "core":
            score += 10
        
        # 影响用户数加权
        if affected_users > 10000:
            score += 15
        elif affected_users > 1000:
            score += 10
        
        # 业务影响加权
        impact_weights = {"core": 15, "important": 10, "normal": 5, "minor": 0}
        score += impact_weights.get(business_impact, 5)
        
        # 回归 Bug 加权
        if is_regression:
            score += 10
        
        # 转换为优先级等级
        if score >= 90:
            level = "P0"
        elif score >= 70:
            level = "P1"
        elif score >= 50:
            level = "P2"
        elif score >= 30:
            level = "P3"
        else:
            level = "P4"
        
        return score, level

🔴 P0 级(紧急)

  • 核心功能不可用
  • 影响 > 10000 用户
  • 数据丢失/安全漏洞
  • SLA: 1 小时响应,4 小时解决

🟠 P1 级(高)

  • 主要功能受损
  • 影响 1000-10000 用户
  • 严重性能问题
  • SLA: 2 小时响应,24 小时解决

🟡 P2 级(中)

  • 次要功能问题
  • 影响 100-1000 用户
  • 一般性能问题
  • SLA: 4 小时响应,72 小时解决

🟢 P3/P4 级(低)

  • 轻微问题/优化建议
  • 影响 < 100 用户
  • UI/UX 问题
  • SLA: 24 小时响应,168 小时解决

💡 4. 建议处理流程(基于 Log 分析)

4.1 Log 驱动的智能分析架构

┌─────────────────────────────────────────────────────────────────┐
│                    建议收集入口                                  │
│   (用户提交 / 系统自动检测 / 客服转交)                            │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│              Log 关联分析引擎 (Log Analysis Engine)               │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  1. Log 采集层                                              │  │
│  │     - 应用 Log (FastAPI/React)                            │  │
│  │     - 系统 Log (OS/Container)                             │  │
│  │     - 业务 Log (用户行为/交易)                             │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                   │
│                              ▼                                   │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  2. Log 解析与标准化                                        │  │
│  │     - 结构化解析 (JSON/Grok)                              │  │
│  │     - 字段标准化 (timestamp, level, module, message)      │  │
│  │     - 上下文关联 (trace_id, session_id, user_id)          │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                   │
│                              ▼                                   │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  3. 异常检测与模式识别                                      │  │
│  │     - 统计异常 (3-sigma, IQR)                             │  │
│  │     - 时序异常 (Prophet, LSTM)                            │  │
│  │     - 模式聚类 (LogCluster, Drain)                        │  │
│  └───────────────────────────────────────────────────────────┘  │
│                              │                                   │
│                              ▼                                   │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │  4. 根因推测与建议生成                                      │  │
│  │     - 关联规则挖掘 (Apriori)                              │  │
│  │     - LLM 根因分析                                         │  │
│  │     - 智能建议生成                                         │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘
                

4.2 Log 分析核心实现

class LogAnalysisEngine:
    """Log 分析引擎 - 支持多维度异常检测和建议生成"""
    
    def analyze_for_suggestion(self, suggestion_text: str, 
                                time_range: Dict[str, datetime],
                                modules: List[str] = None) -> Dict[str, Any]:
        # 1. 从建议文本提取关键词和模块
        keywords = self._extract_keywords(suggestion_text)
        if not modules:
            modules = self._infer_modules_from_keywords(keywords)
        
        # 2. 查询相关 Log
        related_logs = self._query_related_logs(
            keywords=keywords,
            modules=modules,
            time_range=time_range
        )
        
        # 3. 异常检测
        anomalies = self._detect_anomalies(related_logs)
        
        # 4. 模式聚类
        patterns = self._cluster_log_patterns(related_logs)
        
        # 5. LLM 根因分析
        root_cause = self.llm_analyzer.analyze(
            logs=related_logs,
            anomalies=anomalies,
            patterns=patterns,
            suggestion=suggestion_text
        )
        
        return {
            "related_logs_count": len(related_logs),
            "sample_logs": related_logs[:10],
            "anomaly_patterns": anomalies,
            "log_patterns": patterns,
            "root_cause_hypothesis": root_cause,
            "recommendation": self._generate_recommendation(root_cause, anomalies),
            "confidence_score": self._calculate_confidence(anomalies, patterns, root_cause)
        }
    
    def _detect_anomalies(self, logs: List[Dict]) -> List[Dict]:
        """多维度异常检测"""
        anomalies = []
        
        # 1. 错误率异常 (3-sigma 原则)
        error_rates = self._calculate_error_rate_by_time(logs, interval="1h")
        mean_rate = np.mean(error_rates)
        std_rate = np.std(error_rates)
        
        for timestamp, rate in error_rates.items():
            if rate > mean_rate + 3 * std_rate:
                anomalies.append({
                    "type": "error_rate_spike",
                    "timestamp": timestamp,
                    "value": rate,
                    "severity": "high" if rate > mean_rate + 5 * std_rate else "medium"
                })
        
        # 2. 响应时间异常 (IQR 方法)
        response_times = [log.get("response_time", 0) for log in logs]
        if response_times:
            q1, q3 = np.percentile(response_times, [25, 75])
            iqr = q3 - q1
            upper_bound = q3 + 1.5 * iqr
            
            slow_logs = [log for log in logs if log.get("response_time", 0) > upper_bound]
            if slow_logs:
                anomalies.append({
                    "type": "slow_response",
                    "count": len(slow_logs),
                    "avg_response_time": np.mean([l["response_time"] for l in slow_logs]),
                    "threshold": upper_bound
                })
        
        return anomalies

4.3 建议聚合与去重

🎯 智能聚合: 使用向量相似度搜索(Qdrant + SentenceTransformer)避免重复处理相似建议,聚合后统一处理可提升效率 50%+

🔄 5. 完整工作流设计:反馈→方案→下发→反馈→修复闭环

5.1 工作流状态机

    ┌──────────┐
    │  RECEIVED │ ← 接收反馈
    └─────┬────┘
          │
          ▼
    ┌──────────┐
    │CLASSIFIED│ ← 分类 (Bug/建议) + 地域识别
    └─────┬────┘
          │
          ▼
    ┌──────────┐
    │ ANALYZED  │ ← Log 关联分析
    └─────┬────┘
          │
          ▼
    ┌──────────┐
    │  ROUTED   │ ← 路由到团队/人员
    └─────┬────┘
          │
          ▼
    ┌──────────┐
    │PLAN_REVIEW│ ← 制定处理方案
    └─────┬────┘
          │
          ▼
    ┌──────────────┐  ◄─── 🔴 人工介入点 1: 方案审核
    │ HUMAN_REVIEW │      (approve/reject/modify)
    └──────┬───────┘
           │
     ┌─────┴─────┐
     │           │
     ▼           ▼
┌────────┐   ┌────────┐
│ASSIGNED│   │REJECTED│ → END
└───┬────┘   └────────┘
    │
    ▼
┌──────────────┐  ◄─── 🔴 人工介入点 2: 人员反馈
│   FEEDBACK   │      (进展汇报)
└──────┬───────┘
       │
       ▼
┌──────────┐
│   TRACK  │ ← SLA 监控 + 升级处理
└─────┬────┘
      │
      │ [完成?]
      ├─────────────┐
      │             │
      ▼             │ (未完成)
┌──────────┐        │
│ RESOLVED │        │
└─────┬────┘        │
      │             │
      ▼             │
┌──────────┐        │
│  VERIFY  │────────┘ (验证失败)
└─────┬────┘
      │
      │ [通过?]
      ├─────────────┐
      │             │
      ▼             │ (失败)
┌──────────┐        │
│  CLOSED  │        │
└─────┬────┘        │
      │             │
      ▼             │
    END ◄───────────┘
                

5.2 SLA 规则配置

工作流阶段 最大时长 超时升级 通知渠道
RECEIVED 1 小时 WebSocket, 邮件
CLASSIFIED 2 小时 WebSocket, 邮件
ANALYZED 4 小时 WebSocket
PLAN_REVIEW 24 小时 邮件,钉钉/Slack
ASSIGNED 72 小时 邮件,钉钉/Slack
IN_PROGRESS 168 小时 邮件,钉钉/Slack
RESOLVED 48 小时 WebSocket, 邮件

5.3 人工介入点设计

from langgraph.types import interrupt

def human_plan_review(state: SystemAgentState) -> dict:
    """人工审核方案(中断点)"""
    # 中断等待人工审核
    feedback = interrupt({
        "message": "请审核处理方案",
        "plan": state["resolution_plan"],
        "options": ["approve", "reject", "modify"]
    })
    
    if feedback["action"] == "approve":
        return {"workflow_stage": "assigned", "human_feedback": feedback}
    elif feedback["action"] == "modify":
        return {
            "resolution_plan": feedback["modified_plan"],
            "workflow_stage": "assigned",
            "human_feedback": feedback
        }
    else:  # reject
        return {"workflow_stage": "rejected", "human_feedback": feedback}

💾 6. 数据存储与持久化方案

6.1 多层存储架构

🐘 PostgreSQL

主数据库

  • 任务记录
  • 用户信息
  • 团队配置
  • SLA 规则
  • 历史归档

📊 TimescaleDB

时序数据库

  • 应用 Log
  • 性能指标
  • 业务事件
  • 用户行为
  • 监控数据

🔴 Redis

缓存/会话

  • 会话状态
  • 工作流检查点
  • 通知队列
  • 实时缓存
  • 分布式锁

🔵 Qdrant

向量数据库

  • 建议嵌入向量
  • 相似性搜索
  • 聚类分析
  • 语义匹配

6.2 核心数据表设计

-- 任务主表
CREATE TABLE tasks (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    task_number VARCHAR(20) UNIQUE NOT NULL,
    
    -- 分类信息
    issue_type VARCHAR(20) NOT NULL CHECK (issue_type IN ('bug', 'suggestion', 'question')),
    region VARCHAR(20) NOT NULL CHECK (region IN ('domestic', 'international')),
    severity VARCHAR(20) NOT NULL,
    category VARCHAR(100) NOT NULL,
    priority_score INTEGER DEFAULT 0,
    priority_level VARCHAR(5) DEFAULT 'P3',
    
    -- 内容
    title VARCHAR(500) NOT NULL,
    description TEXT NOT NULL,
    
    -- 分析结果
    log_analysis JSONB,
    root_cause_hypothesis TEXT,
    affected_modules TEXT[],
    
    -- 路由信息
    assigned_team_id UUID REFERENCES teams(id),
    assigned_person_id UUID REFERENCES users(id),
    
    -- 工作流状态
    workflow_stage VARCHAR(50) NOT NULL DEFAULT 'received',
    resolution_plan JSONB,
    person_feedback JSONB,
    
    -- 时间信息
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    sla_deadline TIMESTAMPTZ,
    resolved_at TIMESTAMPTZ,
    closed_at TIMESTAMPTZ
);

-- 索引优化
CREATE INDEX idx_tasks_workflow_stage ON tasks(workflow_stage);
CREATE INDEX idx_tasks_assigned_person ON tasks(assigned_person_id);
CREATE INDEX idx_tasks_sla_deadline ON tasks(sla_deadline) 
    WHERE workflow_stage NOT IN ('closed', 'rejected');
CREATE INDEX idx_tasks_metadata ON tasks USING GIN (metadata);

-- TimescaleDB Log 表
CREATE TABLE application_logs (
    time TIMESTAMPTZ NOT NULL,
    log_id UUID DEFAULT gen_random_uuid(),
    level VARCHAR(10) NOT NULL,
    module VARCHAR(100),
    trace_id UUID,
    user_id UUID,
    message TEXT NOT NULL,
    context JSONB,
    response_time_ms INTEGER
);

SELECT create_hypertable('application_logs', 'time');

6.3 LangGraph Checkpoint 配置

from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.redis import RedisStore
import redis

# PostgreSQL Checkpointer (持久化工作流状态)
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:password@localhost:5432/langgraph_db",
    serde_config={"type": "json", "fallback_to_pickle": False}
)

# Redis Store (跨线程持久化记忆)
redis_client = redis.Redis(host="localhost", port=6333, db=0)
store = RedisStore(
    redis_client=redis_client,
    namespace="system_agent",
    ttl_seconds=86400 * 30  # 30 天 TTL
)

# 编译时配置
graph = build_system_agent_graph().compile(
    checkpointer=checkpointer,
    store=store,
    interrupt_before=["human_review", "feedback"],
    durability="sync"
)

🔌 7. WebSocket 流式通信架构

7.1 整体架构

┌─────────────────┐                          ┌─────────────────┐
│   React 前端     │                          │   FastAPI 后端   │
│                 │                          │                 │
│  ┌───────────┐  │      WebSocket 连接       │  ┌───────────┐  │
│  │ WebSocket │◄─┼──────────────────────────┼─►│ WebSocket │  │
│  │  Hook     │  │      (双向流式通信)       │  │  Handler  │  │
│  └───────────┘  │                          │  └───────────┘  │
│        │        │                          │        │        │
│        ▼        │                          │        ▼        │
│  ┌───────────┐  │                          │  ┌───────────┐  │
│  │  消息队列  │  │                          │  │  连接管理  │  │
│  │ (状态管理) │  │                          │  │   (Redis) │  │
│  └───────────┘  │                          │  └───────────┘  │
│        │        │                          │        │        │
│        ▼        │                          │        ▼        │
│  ┌───────────┐  │                          │  ┌───────────┐  │
│  │   UI      │  │                          │  │ LangGraph │  │
│  │  渲染     │  │                          │  │  Stream   │  │
│  └───────────┘  │                          │  └───────────┘  │
└─────────────────┘                          └─────────────────┘
                

7.2 FastAPI WebSocket 实现

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, List
import redis.asyncio as redis

app = FastAPI()
redis_client = redis.Redis(host="localhost", port=6333, db=1)

class ConnectionManager:
    """WebSocket 连接管理器"""
    
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}
        self.task_subscriptions: Dict[str, set] = {}
    
    async def connect(self, websocket: WebSocket, user_id: str):
        await websocket.accept()
        if user_id not in self.active_connections:
            self.active_connections[user_id] = []
        self.active_connections[user_id].append(websocket)
        await redis_client.sadd("online_users", user_id)
    
    async def send_personal_message(self, message: dict, user_id: str):
        if user_id in self.active_connections:
            for connection in self.active_connections[user_id]:
                await connection.send_json(message)
    
    async def broadcast_to_task_subscribers(self, task_id: str, message: dict):
        if task_id in self.task_subscriptions:
            for user_id in self.task_subscriptions[task_id]:
                await self.send_personal_message(message, user_id)

manager = ConnectionManager()

@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(websocket, user_id)
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)
            await handle_websocket_message(websocket, user_id, message)
    except WebSocketDisconnect:
        manager.disconnect(websocket, user_id)

async def stream_graph_execution(websocket: WebSocket, graph, input_data: dict):
    """流式执行 LangGraph 并推送进度"""
    async for event in graph.astream(
        input=input_data,
        config={"stream_mode": ["values", "updates", "debug"]}
    ):
        await websocket.send_json({
            "type": "graph_event",
            "payload": event,
            "timestamp": datetime.utcnow().isoformat()
        })

7.3 React 前端 WebSocket Hook

// hooks/useWebSocket.ts
import { useEffect, useRef, useCallback, useState } from 'react';

export function useWebSocket({ userId, onMessage, onReconnect }) {
  const wsRef = useRef(null);
  const [isConnected, setIsConnected] = useState(false);

  const connect = useCallback(() => {
    const ws = new WebSocket(`ws://localhost:8000/ws/${userId}`);
    
    ws.onopen = () => setIsConnected(true);
    ws.onmessage = (event) => onMessage?.(JSON.parse(event.data));
    ws.onclose = () => {
      setIsConnected(false);
      setTimeout(() => { onReconnect?.(); connect(); }, 3000);
    };
    
    wsRef.current = ws;
  }, [userId, onMessage, onReconnect]);

  useEffect(() => {
    connect();
    return () => wsRef.current?.close();
  }, [connect]);

  const sendMessage = useCallback((message) => {
    if (wsRef.current?.readyState === WebSocket.OPEN) {
      wsRef.current.send(JSON.stringify(message));
    }
  }, []);

  const subscribeToTask = useCallback((taskId) => {
    sendMessage({ type: 'subscribe_task', task_id: taskId });
  }, [sendMessage]);

  return {
    isConnected,
    sendMessage,
    subscribeToTask,
    submitFeedback: (data) => sendMessage({ type: 'submit_feedback', data }),
    provideFeedback: (taskId, feedback) => 
      sendMessage({ type: 'provide_feedback', task_id: taskId, feedback })
  };
}

📁 8. 代码实现框架

8.1 项目目录结构

system-agent/
├── backend/
│   ├── app/
│   │   ├── __init__.py
│   │   ├── main.py                 # FastAPI 应用入口
│   │   ├── config/
│   │   │   ├── settings.py         # 配置管理
│   │   │   └── langgraph_config.py # LangGraph 配置
│   │   ├── agents/
│   │   │   ├── classifier.py       # 分类 Agent
│   │   │   ├── analyzer.py         # 分析 Agent
│   │   │   ├── router.py           # 路由 Agent
│   │   │   └── coordinator.py      # 协调 Agent
│   │   ├── graph/
│   │   │   ├── state.py            # StateGraph 状态定义
│   │   │   ├── nodes.py            # 节点实现
│   │   │   └── workflow.py         # 完整工作流构建
│   │   ├── tools/
│   │   │   ├── log_analysis.py     # Log 分析工具
│   │   │   ├── notification.py     # 通知工具
│   │   │   └── task_management.py  # 任务管理工具
│   │   ├── api/
│   │   │   ├── routes/
│   │   │   │   ├── feedback.py     # 反馈 API
│   │   │   │   ├── tasks.py        # 任务 API
│   │   │   │   └── analytics.py    # 分析 API
│   │   │   └── websocket/
│   │   │       ├── handler.py      # WebSocket 处理器
│   │   │       └── manager.py      # 连接管理器
│   │   ├── models/                 # 数据模型
│   │   ├── services/               # 业务服务
│   │   └── utils/                  # 工具函数
│   ├── tests/
│   ├── requirements.txt
│   └── Dockerfile
├── frontend/
│   ├── src/
│   │   ├── components/
│   │   ├── hooks/
│   │   ├── pages/
│   │   └── utils/
│   └── package.json
├── infrastructure/
│   ├── docker-compose.yml
│   └── docker-compose.prod.yml
├── langgraph.json
└── README.md

8.2 LangGraph 配置

# langgraph.json
{
  "dependencies": ["."],
  "graphs": {
    "system_agent": "./backend/app/graph/workflow.py:build_system_agent_graph"
  },
  "env": ".env",
  "store": {
    "type": "redis",
    "config": {"host": "localhost", "port": 6333}
  },
  "checkpointer": {
    "type": "postgres",
    "config": {"connection_string": "postgresql://user:pass@localhost:5432/langgraph_db"}
  }
}

🚀 9. 部署与运维指南

9.1 快速启动(开发环境)

# 1. 克隆项目
git clone <repository-url>
cd system-agent

# 2. 配置环境变量
cp .env.example .env

# 3. 启动所有服务
cd infrastructure
docker-compose up -d

# 4. 初始化数据库
docker-compose exec postgres psql -U system_agent -f /docker-entrypoint-initdb.d/01-init.sql

# 5. 访问服务
# - FastAPI API: http://localhost:8000
# - API 文档:http://localhost:8000/docs
# - React 前端:http://localhost:3000
# - LangGraph Studio: http://localhost:8123

9.2 Docker Compose 配置

# infrastructure/docker-compose.yml
version: '3.8'

services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_USER: system_agent
      POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
      POSTGRES_DB: system_agent_db
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

  timescaledb:
    image: timescale/timescaledb:latest-pg16
    environment:
      POSTGRES_USER: system_agent
      POSTGRES_PASSWORD: ${DB_PASSWORD:-changeme}
      POSTGRES_DB: logs_db
    ports:
      - "5433:5432"

  redis:
    image: redis:7-alpine
    command: redis-server --appendonly yes
    volumes:
      - redis_data:/data
    ports:
      - "6379:6379"

  qdrant:
    image: qdrant/qdrant:latest
    volumes:
      - qdrant_data:/qdrant/storage
    ports:
      - "6333:6333"

  backend:
    build:
      context: ../backend
      dockerfile: Dockerfile
    environment:
      DATABASE_URL: postgresql://system_agent:${DB_PASSWORD}@postgres:5432/system_agent_db
      REDIS_URL: redis://redis:6379
    depends_on:
      - postgres
      - redis
    ports:
      - "8000:8000"

  frontend:
    build:
      context: ../frontend
      dockerfile: Dockerfile
    environment:
      REACT_APP_API_URL: http://localhost:8000
      REACT_APP_WS_URL: ws://localhost:8000
    depends_on:
      - backend
    ports:
      - "3000:3000"

volumes:
  postgres_data:
  redis_data:
  qdrant_data:

9.3 运维命令手册

📊 日常运维

# 查看服务状态
docker-compose ps

# 查看日志
docker-compose logs -f backend

# 重启服务
docker-compose restart backend

💾 数据库操作

# 备份数据库
docker-compose exec postgres \
  pg_dump -U system_agent system_agent_db > backup.sql

# 查看任务统计
docker-compose exec postgres psql -U system_agent -c \
  "SELECT workflow_stage, COUNT(*) FROM tasks GROUP BY workflow_stage;"

🔍 监控命令

# 查看在线用户
docker-compose exec redis redis-cli SMEMBERS online_users

# 查看待处理通知
docker-compose exec redis redis-cli LLEN notification_queue

9.4 监控与告警配置

⚠️ 关键告警规则:
  • 高错误率:5 分钟内 5xx 错误率 > 10%
  • SLA 超时:任务即将超过 SLA 截止时间
  • WebSocket 连接异常:活跃连接数 < 10
  • 数据库连接池耗尽:可用连接数 < 5

9.5 故障排查指南

问题现象 可能原因 排查步骤 解决方案
WebSocket 连接失败 Redis 不可用 docker-compose ps redis 重启 Redis 服务
工作流卡住 Checkpoint 损坏 查看 LangGraph 日志 从上一个检查点恢复
SLA 监控不触发 后台任务异常 检查 async 任务状态 重启后台任务
Log 分析慢 TimescaleDB 索引缺失 EXPLAIN ANALYZE 添加合适索引

📋 报告总结

✅ 核心交付物

  • 完整架构设计:5 层架构 + 多 Agent 协作 + 状态机工作流
  • 核心流程设计:Bug 反馈(国内/国外)+ 建议处理(Log 驱动)+ 完整闭环
  • 技术方案设计:数据存储 + WebSocket 通信 + 代码框架
  • 运维保障体系:部署方案 + 监控告警 + 故障排查

🎯 关键技术亮点

  • LangGraph 多 Agent 协作
  • Log 驱动的智能分析
  • 人机协同中断机制
  • WebSocket 流式通信
  • 持久化断点续跑
  • SLA 监控保障

📈 预期价值

  • 自动化处理率 80%+
  • 响应时间缩短 60%
  • SLA 达成率 95%+
  • 人工成本降低 50%
  • 用户满意度提升 40%

🚀 下一步行动

  • 环境搭建(1-2 天)
  • 核心功能开发(1-2 周)
  • 前端开发(1-2 周)
  • 测试与优化(1 周)
  • 生产部署(3-5 天)