从零构建生产级 AI Agent 系统,全面解析 LangChain 应用开发、LangGraph 工作流编排、 FastAPI 高性能服务、MCP 协议集成的完整开发指南与最佳实践
随着大语言模型(LLM)技术的快速发展,AI Agent(智能体)已成为企业数字化转型的核心驱动力。 从智能客服、代码助手到数据分析 Agent,AI 应用正以前所未有的速度渗透到各个业务场景。 然而,构建一个企业级 AI Agent 系统面临着诸多技术挑战。
本报告将深入探讨基于 LangChain + LangGraph + FastAPI + MCP 技术栈的企业级 AI Agent 系统开发实战, 提供从架构设计到代码实现、从本地开发到生产部署的完整指南。
企业级 AI Agent 系统采用分层架构设计, 从下至上分别为:基础设施层、服务层、Agent 层、应用层。
| 组件 | 选型 | 替代方案 | 选择理由 |
|---|---|---|---|
| AI 框架 | LangChain | LlamaIndex、Haystack | 生态成熟、工具丰富、社区活跃 |
| 工作流引擎 | LangGraph | Prefect、Airflow | 原生支持 Agent 状态机、与 LangChain 无缝集成 |
| Web 框架 | FastAPI | Flask、Django | 高性能、自动文档、类型安全、异步支持 |
| 上下文协议 | MCP | 自定义协议 | 标准化、多模型互操作、生态支持 |
| 向量数据库 | Chroma/Pinecone | FAISS、Weaviate | 易用性、性能、LangChain 原生支持 |
# 核心框架 langchain==0.1.0 langchain-core==0.1.0 langchain-community==0.0.10 langgraph==0.0.20 # Web 服务 fastapi==0.109.0 uvicorn[standard]==0.27.0 pydantic==2.5.0 pydantic-settings==2.1.0 # MCP 协议 mcp==0.1.0 mcp-server==0.1.0 # LLM Provider langchain-openai==0.0.2 langchain-anthropic==0.0.5 # 向量数据库 chromadb==0.4.22 pinecone-client==3.0.0 # 工具与集成 python-dotenv==1.0.0 httpx==0.26.0 aiosqlite==0.19.0 # 监控与日志 structlog==24.1.0 prometheus-client==0.19.0 opentelemetry-api==1.21.0 # 测试 pytest==7.4.0 pytest-asyncio==0.23.0 httpx==0.26.0
LangChain 是一个用于开发由语言模型驱动的应用程序的框架, 提供了模块化抽象和丰富的工具集成。
| 核心组件 | 功能描述 | 典型应用 |
|---|---|---|
| Models | LLM 抽象层,支持多种模型提供商 | GPT-4、Claude、本地模型 |
| Prompts | 提示词模板和管理 | 动态提示、Few-shot |
| Chains | 组件组合,实现复杂逻辑 | 多步骤推理、RAG |
| Agents | 自主决策,工具调用 | 任务自动化、工具使用 |
| Memory | 对话历史和上下文管理 | 多轮对话、长期记忆 |
| Tools | 外部功能和 API 集成 | 搜索、计算、数据库 |
from langchain_openai import ChatOpenAI from langchain_anthropic import ChatAnthropic from langchain_core.language_models import BaseChatModel from typing import Optional import os class LLMProvider: """LLM 提供者,支持多模型切换""" def __init__( self, provider: str = "openai", model_name: str = "gpt-4", temperature: float = 0.7, max_tokens: int = 4096 ): self.provider = provider self.model_name = model_name self.temperature = temperature self.max_tokens = max_tokens self._model: Optional[BaseChatModel] = None @property def model(self) -> BaseChatModel: """懒加载 LLM 模型""" if self._model is None: self._model = self._create_model() return self._model def _create_model(self) -> BaseChatModel: """创建 LLM 模型实例""" common_kwargs = { "temperature": self.temperature, "max_tokens": self.max_tokens, "streaming": True, } if self.provider == "openai": return ChatOpenAI( model=self.model_name, api_key=os.getenv("OPENAI_API_KEY"), **common_kwargs ) elif self.provider == "anthropic": return ChatAnthropic( model=self.model_name, api_key=os.getenv("ANTHROPIC_API_KEY"), **common_kwargs ) else: raise ValueError(f"Unsupported provider: {self.provider}") async def invoke(self, messages: list) -> str: """调用 LLM""" response = await self.model.ainvoke(messages) return response.content async def stream(self, messages: list): """流式调用 LLM""" async for chunk in self.model.astream(messages): yield chunk.content
from langchain.tools import tool from langchain_core.tools import BaseTool from pydantic import BaseModel, Field from typing import Type import httpx class SearchInput(BaseModel): """搜索工具输入""" query: str = Field(description="搜索查询") num_results: int = Field(default=5, description="结果数量") class SearchTool(BaseTool): """网络搜索工具""" name: str = "web_search" description: str = "搜索网络获取最新信息" args_schema: Type[BaseModel] = SearchInput async def _arun(self, query: str, num_results: int = 5) -> str: """异步执行搜索""" async with httpx.AsyncClient() as client: response = await client.get( "https://api.search.com/search", params={"q": query, "limit": num_results} ) results = response.json() return self._format_results(results) def _format_results(self, results: dict) -> str: """格式化搜索结果""" formatted = [] for i, result in enumerate(results.get("results", []), 1): formatted.append(f"{i}. {result['title']}\n {result['snippet']}\n {result['url']}") return "\n\n".join(formatted) @tool async def calculate(expression: str) -> str: """计算数学表达式""" try: # 安全计算(避免 eval) import ast import operator ops = { ast.Add: operator.add, ast.Sub: operator.sub, ast.Mult: operator.mul, ast.Div: operator.truediv, ast.Pow: operator.pow, } tree = ast.parse(expression, mode="eval") result = _eval_ast(tree.body, ops) return f"结果:{result}" except Exception as e: return f"计算错误:{e}" def _eval_ast(node, ops): if isinstance(node, ast.Num): return node.n elif isinstance(node, ast.BinOp): return ops[type(node.op)](_eval_ast(node.left, ops), _eval_ast(node.right, ops)) else: raise ValueError("Unsupported expression")
from langchain.memory import ConversationBufferMemory, ConversationSummaryMemory from langchain_core.chat_history import BaseChatMessageHistory from langchain_community.chat_message_histories import ChatMessageHistory from typing import Dict import asyncio class MemoryManager: """对话记忆管理器""" def __init__(self): self.sessions: Dict[str, BaseChatMessageHistory] = {} self.max_history_length = 50 # 最大消息数 def get_session(self, session_id: str) -> BaseChatMessageHistory: """获取或创建会话""" if session_id not in self.sessions: self.sessions[session_id] = ChatMessageHistory() return self.sessions[session_id] async def add_message(self, session_id: str, role: str, content: str): """添加消息到会话""" session = self.get_session(session_id) session.add_user_message(content) if role == "user" else session.add_ai_message(content) # 限制历史长度 if len(session.messages) > self.max_history_length: session.messages = session.messages[-self.max_history_length:] async def get_history(self, session_id: str) -> list: """获取会话历史""" session = self.get_session(session_id) return session.messages async def clear_session(self, session_id: str): """清除会话""" if session_id in self.sessions: del self.sessions[session_id] async def summarize(self, session_id: str, llm) -> str: """生成会话摘要""" memory = ConversationSummaryMemory(llm=llm) session = self.get_session(session_id) for message in session.messages: memory.save_context({"input": message.content}, {"output": ""}) return memory.buffer
LangGraph 是 LangChain 的扩展,提供了基于图的工作流编排能力, 支持复杂的状态管理和循环执行,是构建企业级 Agent 系统的核心组件。
| 概念 | 描述 | 类比 |
|---|---|---|
| State | 工作流的状态定义 | 数据模型 |
| Node | 执行单元,处理状态 | 函数/组件 |
| Edge | 节点间的连接 | 流程控制 |
| Graph | 完整的状态机 | 工作流 |
from typing import TypedDict, Annotated, List, Optional from langchain_core.messages import BaseMessage import operator class AgentState(TypedDict): """Agent 工作流状态""" # 消息历史(累加) messages: Annotated[List[BaseMessage], operator.add] # 当前步骤 current_step: str # 上下文信息 context: dict # 工具调用结果 tool_results: List[dict] # 最终输出 output: Optional[str] # 错误信息 error: Optional[str] # 执行元数据 metadata: dict class TaskState(TypedDict): """任务执行状态""" task_id: str status: str # pending, running, success, failed progress: int # 0-100 result: Optional[any] error: Optional[str]
from typing import Literal from .state import AgentState from .llm_provider import LLMProvider from .tools import SearchTool, CalculateTool import json class AgentNodes: """Agent 节点实现""" def __init__(self, llm: LLMProvider, tools: list): self.llm = llm self.tools = {tool.name: tool for tool in tools} async def planner_node(self, state: AgentState) -> AgentState: """规划节点:分析意图,制定计划""" messages = state["messages"] # 调用 LLM 进行规划 system_prompt = """你是一个任务规划助手。分析用户请求,制定执行计划。 返回 JSON 格式:{"steps": [{"tool": "tool_name", "input": {...}}], "reason": "规划理由"}""" response = await self.llm.invoke([ {"role": "system", "content": system_prompt}, *messages ]) try: plan = json.loads(response) state["context"]["plan"] = plan state["current_step"] = "executor" except Exception as e: state["error"] = f"规划失败:{e}" state["current_step"] = "error_handler" return state async def executor_node(self, state: AgentState) -> AgentState: """执行节点:调用工具执行任务""" plan = state["context"].get("plan", {}) steps = plan.get("steps", []) results = [] for step in steps: tool_name = step.get("tool") tool_input = step.get("input", {}) if tool_name in self.tools: tool = self.tools[tool_name] try: result = await tool.arun(**tool_input) results.append({"tool": tool_name, "result": result, "success": True}) except Exception as e: results.append({"tool": tool_name, "error": str(e), "success": False}) state["tool_results"] = results state["current_step"] = "reviewer" return state async def reviewer_node(self, state: AgentState) -> AgentState: """审查节点:评估结果,生成响应""" results = state["tool_results"] messages = state["messages"] # 生成最终响应 system_prompt = """基于工具执行结果,生成用户友好的响应。 如果所有工具执行成功,总结结果。如果有失败,说明原因并提供建议。""" context = f"工具执行结果:{json.dumps(results, ensure_ascii=False)}" response = await self.llm.invoke([ {"role": "system", "content": system_prompt}, *messages, {"role": "user", "content": context} ]) state["output"] = response state["current_step"] = "end" return state async def error_handler_node(self, state: AgentState) -> AgentState: """错误处理节点""" error = state.get("error", "未知错误") state["output"] = f"执行过程中遇到错误:{error}。请稍后重试或联系支持。" state["current_step"] = "end" return state
from langgraph.graph import StateGraph, END from typing import Literal from .state import AgentState from .nodes import AgentNodes class AgentGraph: """Agent 工作流图""" def __init__(self, llm, tools): self.nodes = AgentNodes(llm, tools) self.graph = self._build_graph() def _build_graph(self) -> StateGraph: """构建状态图""" graph = StateGraph(AgentState) # 添加节点 graph.add_node("planner", self.nodes.planner_node) graph.add_node("executor", self.nodes.executor_node) graph.add_node("reviewer", self.nodes.reviewer_node) graph.add_node("error_handler", self.nodes.error_handler_node) # 设置入口 graph.set_entry_point("planner") # 添加边 graph.add_edge("planner", "executor") graph.add_edge("executor", "reviewer") graph.add_edge("error_handler", END) # 条件边:审查后决定结束还是重新执行 graph.add_conditional_edges( "reviewer", self._should_continue, { "continue": "executor", "end": END } ) return graph.compile() def _should_continue(self, state: AgentState) -> Literal["continue", "end"]: """判断是否继续执行""" # 如果有错误且重试次数未超限,继续执行 error = state.get("error") retry_count = state.get("metadata", {}).get("retry_count", 0) if error and retry_count < 3: state["metadata"]["retry_count"] = retry_count + 1 return "continue" return "end" async def invoke(self, initial_state: AgentState): """执行工作流""" async for event in self.graph.astream(initial_state): yield event
ai-agent-platform/ ├── app/ │ ├── __init__.py │ ├── main.py # FastAPI 应用入口 │ ├── config.py # 配置管理 │ ├── dependencies.py # 依赖注入 │ ├── models/ # 数据模型 │ │ ├── __init__.py │ │ ├── request.py # 请求模型 │ │ └── response.py # 响应模型 │ ├── services/ # 业务服务 │ │ ├── __init__.py │ │ ├── agent_service.py # Agent 服务 │ │ └── tool_service.py # 工具服务 │ ├── api/ # API 路由 │ │ ├── __init__.py │ │ ├── v1/ │ │ │ ├── __init__.py │ │ │ ├── agent.py # Agent 路由 │ │ │ └── tools.py # 工具路由 │ │ └── deps.py # API 依赖 │ ├── core/ # 核心组件 │ │ ├── __init__.py │ │ ├── security.py # 安全认证 │ │ └── logging.py # 日志配置 │ └── utils/ # 工具函数 ├── tests/ # 测试目录 ├── requirements.txt # 依赖 └── docker-compose.yml # Docker 配置
from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from contextlib import asynccontextmanager from .config import settings from .core.logging import setup_logging from .api.v1 import agent, tools import structlog @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" # 启动时初始化 setup_logging() logger = structlog.get_logger() logger.info("Starting AI Agent Platform") yield # 关闭时清理 logger.info("Shutting down AI Agent Platform") # 创建 FastAPI 应用 app = FastAPI( title="AI Agent Platform", description="企业级 AI Agent 系统 API", version="1.0.0", lifespan=lifespan ) # CORS 中间件 app.add_middleware( CORSMiddleware, allow_origins=settings.ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 异常处理 @app.exception_handler(Exception) async def global_exception_handler(request: Request, exc: Exception): logger = structlog.get_logger() logger.error("Global exception", error=str(exc)) return JSONResponse( status_code=500, content={"error": "Internal server error", "detail": str(exc)} ) # 注册路由 app.include_router(agent.router, prefix="/api/v1/agent", tags=["Agent"]) app.include_router(tools.router, prefix="/api/v1/tools", tags=["Tools"]) @app.get("/health") async def health_check(): """健康检查""" return {"status": "healthy", "version": app.version}
from fastapi import APIRouter, HTTPException, Depends from fastapi.responses import StreamingResponse from pydantic import BaseModel, Field from typing import Optional, AsyncGenerator from app.services.agent_service import AgentService from app.core.security import verify_api_key import json import uuid router = APIRouter() class ChatRequest(BaseModel): """聊天请求""" message: str = Field(..., description="用户消息", min_length=1) session_id: Optional[str] = Field(None, description="会话 ID") stream: bool = Field(False, description="是否流式响应") class ChatResponse(BaseModel): """聊天响应""" session_id: str message: str tokens_used: int execution_time: float @router.post("/chat", response_model=ChatResponse) async def chat( request: ChatRequest, agent_service: AgentService = Depends(), api_key: str = Depends(verify_api_key) ): """处理聊天请求""" session_id = request.session_id or str(uuid.uuid4()) try: if request.stream: return StreamingResponse( stream_chat_response(session_id, request.message, agent_service), media_type="text/event-stream" ) else: result = await agent_service.process_message(session_id, request.message) return ChatResponse(**result) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) async def stream_chat_response( session_id: str, message: str, agent_service: AgentService ) -> AsyncGenerator[str, None]: """流式聊天响应""" async for chunk in agent_service.stream_process(session_id, message): data = json.dumps({"chunk": chunk}) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" @router.get("/sessions/{session_id}/history") async def get_history( session_id: str, agent_service: AgentService = Depends(), api_key: str = Depends(verify_api_key) ): """获取会话历史""" history = await agent_service.get_session_history(session_id) return {"session_id": session_id, "messages": history}
from pydantic_settings import BaseSettings from typing import List import os class Settings(BaseSettings): """应用配置""" # 应用配置 APP_NAME: str = "AI Agent Platform" APP_VERSION: str = "1.0.0" DEBUG: bool = False # 服务器配置 HOST: str = "0.0.0.0" PORT: int = 8000 # LLM 配置 LLM_PROVIDER: str = "openai" LLM_MODEL: str = "gpt-4" LLM_TEMPERATURE: float = 0.7 LLM_MAX_TOKENS: int = 4096 # API 密钥 API_KEYS: List[str] = [] # CORS ALLOWED_ORIGINS: List[str] = ["*"] # 日志 LOG_LEVEL: str = "INFO" class Config: env_file = ".env" case_sensitive = True settings = Settings()
MCP(Model Context Protocol)是一种标准化的模型上下文协议, 旨在统一不同 AI 模型之间的上下文交换格式,支持多模型互操作和上下文共享。
from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.types import Tool, Resource, Prompt from typing import List, Any import json class AgentMCPServer: """Agent MCP 服务器""" def __init__(self, agent_service): self.server = Server("ai-agent-platform") self.agent_service = agent_service self._setup_handlers() def _setup_handlers(self): """设置 MCP 处理器""" @self.server.list_tools() async def list_tools() -> List[Tool]: """列出可用工具""" return [ Tool( name="web_search", description="搜索网络获取最新信息", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "搜索查询"} }, "required": ["query"] } ), Tool( name="calculate", description="计算数学表达式", inputSchema={ "type": "object", "properties": { "expression": {"type": "string", "description": "数学表达式"} }, "required": ["expression"] } ) ] @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> Any: """调用工具""" return await self.agent_service.execute_tool(name, arguments) @self.server.list_resources() async def list_resources() -> List[Resource]: """列出可用资源""" return [ Resource( uri="agent://sessions", name="Active Sessions", description="当前活跃的 Agent 会话", mimeType="application/json" ) ] @self.server.read_resource() async def read_resource(uri: str) -> Any: """读取资源""" if uri == "agent://sessions": return await self.agent_service.get_active_sessions() raise ValueError(f"Unknown resource: {uri}") async def run(self): """运行 MCP 服务器""" await self.server.run( InitializationOptions( server_name="ai-agent-platform", server_version="1.0.0", capabilities={} ) )
from mcp.client import Client from mcp.client.session import ClientSession from mcp.transport import StdioTransport import asyncio class MCPClient: """MCP 客户端""" def __init__(self, server_command: str): self.server_command = server_command self.session: ClientSession = None async def connect(self): """连接到 MCP 服务器""" transport = StdioTransport(command=self.server_command) await transport.start() self.session = ClientSession(transport) await self.session.initialize() async def list_tools(self): """列出远程工具""" return await self.session.list_tools() async def call_tool(self, name: str, arguments: dict): """调用远程工具""" return await self.session.call_tool(name, arguments) async def disconnect(self): """断开连接""" if self.session: await self.session.close() # 使用示例 async def main(): client = MCPClient("python -m mcp_server") await client.connect() tools = await client.list_tools() print(f"Available tools: {tools}") result = await client.call_tool("web_search", {"query": "AI news"}) print(f"Search result: {result}") await client.disconnect() if __name__ == "__main__": asyncio.run(main())
# LLM 配置 LLM_PROVIDER=openai LLM_MODEL=gpt-4 LLM_TEMPERATURE=0.7 LLM_MAX_TOKENS=4096 OPENAI_API_KEY=sk-... # 服务器配置 HOST=0.0.0.0 PORT=8000 DEBUG=false # API 密钥 API_KEYS=your-api-key-1,your-api-key-2 # 向量数据库 CHROMA_DB_PATH=./data/chroma PINECONE_API_KEY=... # 日志 LOG_LEVEL=INFO
version: '3.8' services: # FastAPI 应用 app: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} volumes: - ./data:/app/data depends_on: - chromadb - redis restart: unless-stopped # 向量数据库 chromadb: image: chromadb/chroma:latest ports: - "8001:8000" volumes: - ./data/chroma:/chroma/chroma restart: unless-stopped # 缓存 redis: image: redis:7-alpine ports: - "6379:6379" volumes: - ./data/redis:/data command: redis-server --appendonly yes restart: unless-stopped # MCP Server mcp-server: build: context: . dockerfile: Dockerfile.mcp environment: - OPENAI_API_KEY=${OPENAI_API_KEY} depends_on: - app restart: unless-stopped
# 本地开发 $ pip install -r requirements.txt $ uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 # Docker 部署 $ docker-compose up -d # 运行测试 $ pytest tests/ -v # API 测试 $ curl -X POST http://localhost:8000/api/v1/agent/chat \ -H "Content-Type: application/json" \ -H "Authorization: Bearer your-api-key" \ -d '{"message": "你好,请帮我分析销售数据"}' # 查看文档 $ open http://localhost:8000/docs
| 优化项 | 措施 | 效果 |
|---|---|---|
| LLM 调用 | 响应缓存、批量处理 | 减少 50% API 调用 |
| 数据库 | 连接池、查询优化 | 提升 3 倍查询速度 |
| 内存 | 对象池、懒加载 | 减少 40% 内存占用 |
| 并发 | 异步 IO、Worker 池 | 支持 10 倍并发 |
"企业级 AI Agent 系统的建设是一个持续演进的过程, 需要在技术选型、架构设计、安全合规等方面不断迭代优化。 LangChain + LangGraph + FastAPI + MCP 提供了坚实的基础, 但真正的成功在于与业务场景的深度结合。"