🚀 企业级 AI Agent 开发实战系列

企业级 AI Agent 系统开发实战
LangChain + LangGraph + FastAPI + MCP

从零构建生产级 AI Agent 系统,全面解析 LangChain 应用开发、LangGraph 工作流编排、 FastAPI 高性能服务、MCP 协议集成的完整开发指南与最佳实践

📅 2025 年 2 月 ⏱️ 阅读时间:45 分钟 📊 技术深度:专家级

1. 引言:企业级 AI Agent 系统挑战

随着大语言模型(LLM)技术的快速发展,AI Agent(智能体)已成为企业数字化转型的核心驱动力。 从智能客服、代码助手到数据分析 Agent,AI 应用正以前所未有的速度渗透到各个业务场景。 然而,构建一个企业级 AI Agent 系统面临着诸多技术挑战。

⚠️ 核心挑战
  • 复杂工作流编排:多步骤任务需要精确的状态管理和流程控制
  • 工具集成:需要与现有系统、API、数据库无缝集成
  • 性能与扩展:高并发请求下的低延迟响应和水平扩展能力
  • 可观测性:完整的日志、追踪、监控体系
  • 安全与合规:数据隐私、访问控制、审计日志
  • 标准化协议:统一的上下文协议,支持多模型互操作

本报告将深入探讨基于 LangChain + LangGraph + FastAPI + MCP 技术栈的企业级 AI Agent 系统开发实战, 提供从架构设计到代码实现、从本地开发到生产部署的完整指南。

🦜
LangChain
AI 应用开发框架,提供 LLM 抽象、工具集成、记忆管理等核心能力
🔗
LangGraph
基于图的工作流编排引擎,支持复杂 Agent 流程和状态管理
FastAPI
高性能 Python Web 框架,自动文档、类型安全、异步支持
🔌
MCP
Model Context Protocol,标准化的模型上下文协议

2. 技术栈概览与选型

2.1 整体架构设计

企业级 AI Agent 系统采用分层架构设计, 从下至上分别为:基础设施层、服务层、Agent 层、应用层。

🌐 应用层(Application Layer)
Web 控制台
API 网关
SDK/CLI
⬇️
🤖 Agent 层(Agent Layer)
LangGraph 编排
多 Agent 协作
工具注册
⬇️
⚙️ 服务层(Service Layer)
FastAPI
LangChain
MCP Server
⬇️
💾 基础设施层(Infrastructure Layer)
LLM Provider
Vector DB
Message Queue

2.2 技术选型对比

组件 选型 替代方案 选择理由
AI 框架 LangChain LlamaIndex、Haystack 生态成熟、工具丰富、社区活跃
工作流引擎 LangGraph Prefect、Airflow 原生支持 Agent 状态机、与 LangChain 无缝集成
Web 框架 FastAPI Flask、Django 高性能、自动文档、类型安全、异步支持
上下文协议 MCP 自定义协议 标准化、多模型互操作、生态支持
向量数据库 Chroma/Pinecone FAISS、Weaviate 易用性、性能、LangChain 原生支持

2.3 项目依赖配置

requirements.txt
# 核心框架
langchain==0.1.0
langchain-core==0.1.0
langchain-community==0.0.10
langgraph==0.0.20

# Web 服务
fastapi==0.109.0
uvicorn[standard]==0.27.0
pydantic==2.5.0
pydantic-settings==2.1.0

# MCP 协议
mcp==0.1.0
mcp-server==0.1.0

# LLM Provider
langchain-openai==0.0.2
langchain-anthropic==0.0.5

# 向量数据库
chromadb==0.4.22
pinecone-client==3.0.0

# 工具与集成
python-dotenv==1.0.0
httpx==0.26.0
aiosqlite==0.19.0

# 监控与日志
structlog==24.1.0
prometheus-client==0.19.0
opentelemetry-api==1.21.0

# 测试
pytest==7.4.0
pytest-asyncio==0.23.0
httpx==0.26.0

3. LangChain 核心原理与应用

3.1 LangChain 核心概念

LangChain 是一个用于开发由语言模型驱动的应用程序的框架, 提供了模块化抽象和丰富的工具集成。

核心组件 功能描述 典型应用
Models LLM 抽象层,支持多种模型提供商 GPT-4、Claude、本地模型
Prompts 提示词模板和管理 动态提示、Few-shot
Chains 组件组合,实现复杂逻辑 多步骤推理、RAG
Agents 自主决策,工具调用 任务自动化、工具使用
Memory 对话历史和上下文管理 多轮对话、长期记忆
Tools 外部功能和 API 集成 搜索、计算、数据库

3.2 LLM 抽象与调用

llm_provider.py
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.language_models import BaseChatModel
from typing import Optional
import os

class LLMProvider:
    """LLM 提供者,支持多模型切换"""
    
    def __init__(
        self,
        provider: str = "openai",
        model_name: str = "gpt-4",
        temperature: float = 0.7,
        max_tokens: int = 4096
    ):
        self.provider = provider
        self.model_name = model_name
        self.temperature = temperature
        self.max_tokens = max_tokens
        self._model: Optional[BaseChatModel] = None
    
    @property
    def model(self) -> BaseChatModel:
        """懒加载 LLM 模型"""
        if self._model is None:
            self._model = self._create_model()
        return self._model
    
    def _create_model(self) -> BaseChatModel:
        """创建 LLM 模型实例"""
        common_kwargs = {
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "streaming": True,
        }
        
        if self.provider == "openai":
            return ChatOpenAI(
                model=self.model_name,
                api_key=os.getenv("OPENAI_API_KEY"),
                **common_kwargs
            )
        elif self.provider == "anthropic":
            return ChatAnthropic(
                model=self.model_name,
                api_key=os.getenv("ANTHROPIC_API_KEY"),
                **common_kwargs
            )
        else:
            raise ValueError(f"Unsupported provider: {self.provider}")
    
    async def invoke(self, messages: list) -> str:
        """调用 LLM"""
        response = await self.model.ainvoke(messages)
        return response.content
    
    async def stream(self, messages: list):
        """流式调用 LLM"""
        async for chunk in self.model.astream(messages):
            yield chunk.content

3.3 工具定义与注册

tools.py
from langchain.tools import tool
from langchain_core.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type
import httpx

class SearchInput(BaseModel):
    """搜索工具输入"""
    query: str = Field(description="搜索查询")
    num_results: int = Field(default=5, description="结果数量")

class SearchTool(BaseTool):
    """网络搜索工具"""
    name: str = "web_search"
    description: str = "搜索网络获取最新信息"
    args_schema: Type[BaseModel] = SearchInput
    
    async def _arun(self, query: str, num_results: int = 5) -> str:
        """异步执行搜索"""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                "https://api.search.com/search",
                params={"q": query, "limit": num_results}
            )
            results = response.json()
            return self._format_results(results)
    
    def _format_results(self, results: dict) -> str:
        """格式化搜索结果"""
        formatted = []
        for i, result in enumerate(results.get("results", []), 1):
            formatted.append(f"{i}. {result['title']}\n   {result['snippet']}\n   {result['url']}")
        return "\n\n".join(formatted)

@tool
async def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        # 安全计算(避免 eval)
        import ast
        import operator
        
        ops = {
            ast.Add: operator.add,
            ast.Sub: operator.sub,
            ast.Mult: operator.mul,
            ast.Div: operator.truediv,
            ast.Pow: operator.pow,
        }
        
        tree = ast.parse(expression, mode="eval")
        result = _eval_ast(tree.body, ops)
        return f"结果:{result}"
    except Exception as e:
        return f"计算错误:{e}"

def _eval_ast(node, ops):
    if isinstance(node, ast.Num):
        return node.n
    elif isinstance(node, ast.BinOp):
        return ops[type(node.op)](_eval_ast(node.left, ops), _eval_ast(node.right, ops))
    else:
        raise ValueError("Unsupported expression")

3.4 记忆系统实现

memory.py
from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from typing import Dict
import asyncio

class MemoryManager:
    """对话记忆管理器"""
    
    def __init__(self):
        self.sessions: Dict[str, BaseChatMessageHistory] = {}
        self.max_history_length = 50  # 最大消息数
    
    def get_session(self, session_id: str) -> BaseChatMessageHistory:
        """获取或创建会话"""
        if session_id not in self.sessions:
            self.sessions[session_id] = ChatMessageHistory()
        return self.sessions[session_id]
    
    async def add_message(self, session_id: str, role: str, content: str):
        """添加消息到会话"""
        session = self.get_session(session_id)
        session.add_user_message(content) if role == "user" else session.add_ai_message(content)
        
        # 限制历史长度
        if len(session.messages) > self.max_history_length:
            session.messages = session.messages[-self.max_history_length:]
    
    async def get_history(self, session_id: str) -> list:
        """获取会话历史"""
        session = self.get_session(session_id)
        return session.messages
    
    async def clear_session(self, session_id: str):
        """清除会话"""
        if session_id in self.sessions:
            del self.sessions[session_id]
    
    async def summarize(self, session_id: str, llm) -> str:
        """生成会话摘要"""
        memory = ConversationSummaryMemory(llm=llm)
        session = self.get_session(session_id)
        
        for message in session.messages:
            memory.save_context({"input": message.content}, {"output": ""})
        
        return memory.buffer

4. LangGraph 工作流编排

4.1 LangGraph 核心概念

LangGraph 是 LangChain 的扩展,提供了基于图的工作流编排能力, 支持复杂的状态管理和循环执行,是构建企业级 Agent 系统的核心组件。

概念 描述 类比
State 工作流的状态定义 数据模型
Node 执行单元,处理状态 函数/组件
Edge 节点间的连接 流程控制
Graph 完整的状态机 工作流

4.2 状态定义

state.py
from typing import TypedDict, Annotated, List, Optional
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
    """Agent 工作流状态"""
    
    # 消息历史(累加)
    messages: Annotated[List[BaseMessage], operator.add]
    
    # 当前步骤
    current_step: str
    
    # 上下文信息
    context: dict
    
    # 工具调用结果
    tool_results: List[dict]
    
    # 最终输出
    output: Optional[str]
    
    # 错误信息
    error: Optional[str]
    
    # 执行元数据
    metadata: dict

class TaskState(TypedDict):
    """任务执行状态"""
    task_id: str
    status: str  # pending, running, success, failed
    progress: int  # 0-100
    result: Optional[any]
    error: Optional[str]

4.3 节点实现

nodes.py
from typing import Literal
from .state import AgentState
from .llm_provider import LLMProvider
from .tools import SearchTool, CalculateTool
import json

class AgentNodes:
    """Agent 节点实现"""
    
    def __init__(self, llm: LLMProvider, tools: list):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
    
    async def planner_node(self, state: AgentState) -> AgentState:
        """规划节点:分析意图,制定计划"""
        messages = state["messages"]
        
        # 调用 LLM 进行规划
        system_prompt = """你是一个任务规划助手。分析用户请求,制定执行计划。
返回 JSON 格式:{"steps": [{"tool": "tool_name", "input": {...}}], "reason": "规划理由"}"""
        
        response = await self.llm.invoke([
            {"role": "system", "content": system_prompt},
            *messages
        ])
        
        try:
            plan = json.loads(response)
            state["context"]["plan"] = plan
            state["current_step"] = "executor"
        except Exception as e:
            state["error"] = f"规划失败:{e}"
            state["current_step"] = "error_handler"
        
        return state
    
    async def executor_node(self, state: AgentState) -> AgentState:
        """执行节点:调用工具执行任务"""
        plan = state["context"].get("plan", {})
        steps = plan.get("steps", [])
        results = []
        
        for step in steps:
            tool_name = step.get("tool")
            tool_input = step.get("input", {})
            
            if tool_name in self.tools:
                tool = self.tools[tool_name]
                try:
                    result = await tool.arun(**tool_input)
                    results.append({"tool": tool_name, "result": result, "success": True})
                except Exception as e:
                    results.append({"tool": tool_name, "error": str(e), "success": False})
        
        state["tool_results"] = results
        state["current_step"] = "reviewer"
        return state
    
    async def reviewer_node(self, state: AgentState) -> AgentState:
        """审查节点:评估结果,生成响应"""
        results = state["tool_results"]
        messages = state["messages"]
        
        # 生成最终响应
        system_prompt = """基于工具执行结果,生成用户友好的响应。
如果所有工具执行成功,总结结果。如果有失败,说明原因并提供建议。"""
        
        context = f"工具执行结果:{json.dumps(results, ensure_ascii=False)}"
        
        response = await self.llm.invoke([
            {"role": "system", "content": system_prompt},
            *messages,
            {"role": "user", "content": context}
        ])
        
        state["output"] = response
        state["current_step"] = "end"
        return state
    
    async def error_handler_node(self, state: AgentState) -> AgentState:
        """错误处理节点"""
        error = state.get("error", "未知错误")
        state["output"] = f"执行过程中遇到错误:{error}。请稍后重试或联系支持。"
        state["current_step"] = "end"
        return state

4.4 图构建与条件边

graph.py
from langgraph.graph import StateGraph, END
from typing import Literal
from .state import AgentState
from .nodes import AgentNodes

class AgentGraph:
    """Agent 工作流图"""
    
    def __init__(self, llm, tools):
        self.nodes = AgentNodes(llm, tools)
        self.graph = self._build_graph()
    
    def _build_graph(self) -> StateGraph:
        """构建状态图"""
        graph = StateGraph(AgentState)
        
        # 添加节点
        graph.add_node("planner", self.nodes.planner_node)
        graph.add_node("executor", self.nodes.executor_node)
        graph.add_node("reviewer", self.nodes.reviewer_node)
        graph.add_node("error_handler", self.nodes.error_handler_node)
        
        # 设置入口
        graph.set_entry_point("planner")
        
        # 添加边
        graph.add_edge("planner", "executor")
        graph.add_edge("executor", "reviewer")
        graph.add_edge("error_handler", END)
        
        # 条件边:审查后决定结束还是重新执行
        graph.add_conditional_edges(
            "reviewer",
            self._should_continue,
            {
                "continue": "executor",
                "end": END
            }
        )
        
        return graph.compile()
    
    def _should_continue(self, state: AgentState) -> Literal["continue", "end"]:
        """判断是否继续执行"""
        # 如果有错误且重试次数未超限,继续执行
        error = state.get("error")
        retry_count = state.get("metadata", {}).get("retry_count", 0)
        
        if error and retry_count < 3:
            state["metadata"]["retry_count"] = retry_count + 1
            return "continue"
        
        return "end"
    
    async def invoke(self, initial_state: AgentState):
        """执行工作流"""
        async for event in self.graph.astream(initial_state):
            yield event

5. FastAPI 高性能服务构建

5.1 项目结构

项目目录结构
ai-agent-platform/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI 应用入口
│   ├── config.py            # 配置管理
│   ├── dependencies.py      # 依赖注入
│   ├── models/              # 数据模型
│   │   ├── __init__.py
│   │   ├── request.py       # 请求模型
│   │   └── response.py      # 响应模型
│   ├── services/            # 业务服务
│   │   ├── __init__.py
│   │   ├── agent_service.py # Agent 服务
│   │   └── tool_service.py  # 工具服务
│   ├── api/                 # API 路由
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── agent.py     # Agent 路由
│   │   │   └── tools.py     # 工具路由
│   │   └── deps.py          # API 依赖
│   ├── core/                # 核心组件
│   │   ├── __init__.py
│   │   ├── security.py      # 安全认证
│   │   └── logging.py       # 日志配置
│   └── utils/               # 工具函数
├── tests/                   # 测试目录
├── requirements.txt         # 依赖
└── docker-compose.yml       # Docker 配置

5.2 FastAPI 应用入口

app/main.py
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from .config import settings
from .core.logging import setup_logging
from .api.v1 import agent, tools
import structlog

@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    # 启动时初始化
    setup_logging()
    logger = structlog.get_logger()
    logger.info("Starting AI Agent Platform")
    
    yield
    
    # 关闭时清理
    logger.info("Shutting down AI Agent Platform")

# 创建 FastAPI 应用
app = FastAPI(
    title="AI Agent Platform",
    description="企业级 AI Agent 系统 API",
    version="1.0.0",
    lifespan=lifespan
)

# CORS 中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 异常处理
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
    logger = structlog.get_logger()
    logger.error("Global exception", error=str(exc))
    return JSONResponse(
        status_code=500,
        content={"error": "Internal server error", "detail": str(exc)}
    )

# 注册路由
app.include_router(agent.router, prefix="/api/v1/agent", tags=["Agent"])
app.include_router(tools.router, prefix="/api/v1/tools", tags=["Tools"])

@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy", "version": app.version}

5.3 Agent API 实现

app/api/v1/agent.py
from fastapi import APIRouter, HTTPException, Depends
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from typing import Optional, AsyncGenerator
from app.services.agent_service import AgentService
from app.core.security import verify_api_key
import json
import uuid

router = APIRouter()

class ChatRequest(BaseModel):
    """聊天请求"""
    message: str = Field(..., description="用户消息", min_length=1)
    session_id: Optional[str] = Field(None, description="会话 ID")
    stream: bool = Field(False, description="是否流式响应")

class ChatResponse(BaseModel):
    """聊天响应"""
    session_id: str
    message: str
    tokens_used: int
    execution_time: float

@router.post("/chat", response_model=ChatResponse)
async def chat(
    request: ChatRequest,
    agent_service: AgentService = Depends(),
    api_key: str = Depends(verify_api_key)
):
    """处理聊天请求"""
    session_id = request.session_id or str(uuid.uuid4())
    
    try:
        if request.stream:
            return StreamingResponse(
                stream_chat_response(session_id, request.message, agent_service),
                media_type="text/event-stream"
            )
        else:
            result = await agent_service.process_message(session_id, request.message)
            return ChatResponse(**result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

async def stream_chat_response(
    session_id: str,
    message: str,
    agent_service: AgentService
) -> AsyncGenerator[str, None]:
    """流式聊天响应"""
    async for chunk in agent_service.stream_process(session_id, message):
        data = json.dumps({"chunk": chunk})
        yield f"data: {data}\n\n"
    yield "data: [DONE]\n\n"

@router.get("/sessions/{session_id}/history")
async def get_history(
    session_id: str,
    agent_service: AgentService = Depends(),
    api_key: str = Depends(verify_api_key)
):
    """获取会话历史"""
    history = await agent_service.get_session_history(session_id)
    return {"session_id": session_id, "messages": history}

5.4 配置管理

app/config.py
from pydantic_settings import BaseSettings
from typing import List
import os

class Settings(BaseSettings):
    """应用配置"""
    
    # 应用配置
    APP_NAME: str = "AI Agent Platform"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False
    
    # 服务器配置
    HOST: str = "0.0.0.0"
    PORT: int = 8000
    
    # LLM 配置
    LLM_PROVIDER: str = "openai"
    LLM_MODEL: str = "gpt-4"
    LLM_TEMPERATURE: float = 0.7
    LLM_MAX_TOKENS: int = 4096
    
    # API 密钥
    API_KEYS: List[str] = []
    
    # CORS
    ALLOWED_ORIGINS: List[str] = ["*"]
    
    # 日志
    LOG_LEVEL: str = "INFO"
    
    class Config:
        env_file = ".env"
        case_sensitive = True

settings = Settings()

6. MCP 协议集成与实践

6.1 MCP 协议概述

MCP(Model Context Protocol)是一种标准化的模型上下文协议, 旨在统一不同 AI 模型之间的上下文交换格式,支持多模型互操作和上下文共享。

💡 MCP 核心优势
  • 标准化:统一的上下文格式,降低集成成本
  • 互操作性:支持多模型提供商无缝切换
  • 可扩展:插件化架构,易于添加新功能
  • 安全性:内置认证和权限控制

6.2 MCP Server 实现

mcp_server.py
from mcp.server import Server
from mcp.server.models import InitializationOptions
from mcp.types import Tool, Resource, Prompt
from typing import List, Any
import json

class AgentMCPServer:
    """Agent MCP 服务器"""
    
    def __init__(self, agent_service):
        self.server = Server("ai-agent-platform")
        self.agent_service = agent_service
        self._setup_handlers()
    
    def _setup_handlers(self):
        """设置 MCP 处理器"""
        
        @self.server.list_tools()
        async def list_tools() -> List[Tool]:
            """列出可用工具"""
            return [
                Tool(
                    name="web_search",
                    description="搜索网络获取最新信息",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "query": {"type": "string", "description": "搜索查询"}
                        },
                        "required": ["query"]
                    }
                ),
                Tool(
                    name="calculate",
                    description="计算数学表达式",
                    inputSchema={
                        "type": "object",
                        "properties": {
                            "expression": {"type": "string", "description": "数学表达式"}
                        },
                        "required": ["expression"]
                    }
                )
            ]
        
        @self.server.call_tool()
        async def call_tool(name: str, arguments: dict) -> Any:
            """调用工具"""
            return await self.agent_service.execute_tool(name, arguments)
        
        @self.server.list_resources()
        async def list_resources() -> List[Resource]:
            """列出可用资源"""
            return [
                Resource(
                    uri="agent://sessions",
                    name="Active Sessions",
                    description="当前活跃的 Agent 会话",
                    mimeType="application/json"
                )
            ]
        
        @self.server.read_resource()
        async def read_resource(uri: str) -> Any:
            """读取资源"""
            if uri == "agent://sessions":
                return await self.agent_service.get_active_sessions()
            raise ValueError(f"Unknown resource: {uri}")
    
    async def run(self):
        """运行 MCP 服务器"""
        await self.server.run(
            InitializationOptions(
                server_name="ai-agent-platform",
                server_version="1.0.0",
                capabilities={}
            )
        )

6.3 MCP Client 集成

mcp_client.py
from mcp.client import Client
from mcp.client.session import ClientSession
from mcp.transport import StdioTransport
import asyncio

class MCPClient:
    """MCP 客户端"""
    
    def __init__(self, server_command: str):
        self.server_command = server_command
        self.session: ClientSession = None
    
    async def connect(self):
        """连接到 MCP 服务器"""
        transport = StdioTransport(command=self.server_command)
        await transport.start()
        
        self.session = ClientSession(transport)
        await self.session.initialize()
    
    async def list_tools(self):
        """列出远程工具"""
        return await self.session.list_tools()
    
    async def call_tool(self, name: str, arguments: dict):
        """调用远程工具"""
        return await self.session.call_tool(name, arguments)
    
    async def disconnect(self):
        """断开连接"""
        if self.session:
            await self.session.close()

# 使用示例
async def main():
    client = MCPClient("python -m mcp_server")
    await client.connect()
    
    tools = await client.list_tools()
    print(f"Available tools: {tools}")
    
    result = await client.call_tool("web_search", {"query": "AI news"})
    print(f"Search result: {result}")
    
    await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

7. 完整项目实战

7.1 项目初始化

.env 配置
# LLM 配置
LLM_PROVIDER=openai
LLM_MODEL=gpt-4
LLM_TEMPERATURE=0.7
LLM_MAX_TOKENS=4096
OPENAI_API_KEY=sk-...

# 服务器配置
HOST=0.0.0.0
PORT=8000
DEBUG=false

# API 密钥
API_KEYS=your-api-key-1,your-api-key-2

# 向量数据库
CHROMA_DB_PATH=./data/chroma
PINECONE_API_KEY=...

# 日志
LOG_LEVEL=INFO

7.2 Docker 部署配置

docker-compose.yml
version: '3.8'

services:
  # FastAPI 应用
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
    volumes:
      - ./data:/app/data
    depends_on:
      - chromadb
      - redis
    restart: unless-stopped

  # 向量数据库
  chromadb:
    image: chromadb/chroma:latest
    ports:
      - "8001:8000"
    volumes:
      - ./data/chroma:/chroma/chroma
    restart: unless-stopped

  # 缓存
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - ./data/redis:/data
    command: redis-server --appendonly yes
    restart: unless-stopped

  # MCP Server
  mcp-server:
    build:
      context: .
      dockerfile: Dockerfile.mcp
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
    depends_on:
      - app
    restart: unless-stopped

7.3 运行与测试

启动命令
# 本地开发
$ pip install -r requirements.txt
$ uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

# Docker 部署
$ docker-compose up -d

# 运行测试
$ pytest tests/ -v

# API 测试
$ curl -X POST http://localhost:8000/api/v1/agent/chat \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer your-api-key" \
  -d '{"message": "你好,请帮我分析销售数据"}'

# 查看文档
$ open http://localhost:8000/docs

8. 部署与运维

8.1 生产环境配置

8.2 性能优化

优化项 措施 效果
LLM 调用 响应缓存、批量处理 减少 50% API 调用
数据库 连接池、查询优化 提升 3 倍查询速度
内存 对象池、懒加载 减少 40% 内存占用
并发 异步 IO、Worker 池 支持 10 倍并发

9. 总结与展望

9.1 核心要点回顾

9.2 技术趋势展望

🚀 未来发展方向
  • 多模态 Agent:支持图像、音频、视频理解
  • 自主 Agent:更强的自主规划和执行能力
  • Agent 协作:多 Agent 协同完成复杂任务
  • 边缘部署:轻量化模型,支持端侧运行
  • 可解释性:增强 Agent 决策透明度

"企业级 AI Agent 系统的建设是一个持续演进的过程, 需要在技术选型、架构设计、安全合规等方面不断迭代优化。 LangChain + LangGraph + FastAPI + MCP 提供了坚实的基础, 但真正的成功在于与业务场景的深度结合。"