深度解析 Manus 通用智能体的核心原理与架构设计, 从零开始构建具备自主规划、工具调用、多轮对话能力的生产级 AI Agent 系统, 提供完整的代码实现与最佳实践指南
Manus 是一款通用型 AI 智能体系统,代表了当前 AI Agent 技术的最高水平。 与传统的任务特定型 Agent 不同,Manus 具备通用任务处理能力, 能够理解复杂指令、自主规划执行步骤、调用各种工具、并在多轮交互中持续学习和改进。
| 维度 | 传统 Agent | Manus 通用智能体 |
|---|---|---|
| 任务范围 | 单一领域/任务 | 跨领域通用任务 |
| 规划能力 | 预定义流程 | 动态自主规划 |
| 工具集成 | 固定工具集 | 动态工具发现与使用 |
| 学习能力 | 静态规则 | 持续学习与优化 |
| 异常处理 | 预设异常处理 | 自主异常恢复 |
Manus 通用智能体采用分层模块化架构设计, 各层之间通过标准化接口通信,实现高内聚低耦合的系统设计。
| 组件 | 职责 | 关键技术 |
|---|---|---|
| Perception | 多模态输入处理与理解 | LLM、Vision Model、ASR |
| Planner | 任务分解与执行规划 | CoT、ToT、ReAct |
| Executor | 工具调用与任务执行 | Function Calling、Sandbox |
| Memory | 短期/长期记忆管理 | Vector DB、RAG |
| Reflector | 执行反思与策略优化 | Self-Reflection、RLHF |
Manus 的数据流遵循感知 - 规划 - 执行 - 反思的闭环流程:
# 1. 感知阶段:理解用户输入 async def perceive(self, input: UserInput) -> Perception: """处理多模态输入,提取关键信息""" # 文本理解 text_understanding = await self.llm.analyze(input.text) # 图像理解(如果有) if input.images: vision_analysis = await self.vision_model.analyze(input.images) # 整合感知结果 return Perception( intent=text_understanding.intent, entities=text_understanding.entities, context=text_understanding.context, vision=vision_analysis if input.images else None ) # 2. 规划阶段:制定执行计划 async def plan(self, perception: Perception) -> Plan: """基于感知结果,制定任务执行计划""" # 任务分解 subtasks = await self.decompose_task(perception.intent) # 工具选择 for subtask in subtasks: subtask.tools = await self.select_tools(subtask) # 执行顺序优化 plan = await self.optimize_execution_order(subtasks) return plan # 3. 执行阶段:调用工具完成任务 async def execute(self, plan: Plan) -> ExecutionResult: """执行计划,调用工具完成任务""" results = [] for step in plan.steps: try: result = await self.invoke_tool(step.tool, step.params) results.append(ExecutionStepResult(success=True, result=result)) except Exception as e: # 异常处理与恢复 recovery = await self.handle_exception(step, e) results.append(ExecutionStepResult(success=False, error=e, recovery=recovery)) return ExecutionResult(steps=results) # 4. 反思阶段:评估结果,优化策略 async def reflect(self, result: ExecutionResult, perception: Perception) -> Reflection: """评估执行结果,提取经验教训""" # 结果评估 success = self.evaluate_success(result, perception.intent) # 经验提取 lessons = await self.extract_lessons(result) # 策略优化 if not success: await self.update_strategy(lessons) # 记忆存储 await self.memory.store(perception, result, lessons) return Reflection(success=success, lessons=lessons)
manus-agent/ ├── app/ │ ├── __init__.py │ ├── main.py # 应用入口 │ ├── config.py # 配置管理 │ ├── core/ # 核心组件 │ │ ├── __init__.py │ │ ├── agent.py # Agent 主类 │ │ ├── planner.py # 规划引擎 │ │ ├── executor.py # 执行引擎 │ │ └── reflector.py # 反思引擎 │ ├── perception/ # 感知模块 │ │ ├── __init__.py │ │ ├── nlu.py # 自然语言理解 │ │ └── vision.py # 视觉理解 │ ├── memory/ # 记忆系统 │ │ ├── __init__.py │ │ ├── short_term.py # 短期记忆 │ │ ├── long_term.py # 长期记忆 │ │ └── vector_store.py # 向量存储 │ ├── tools/ # 工具系统 │ │ ├── __init__.py │ │ ├── registry.py # 工具注册中心 │ │ ├── base.py # 工具基类 │ │ ├── web_search.py # 网络搜索 │ │ ├── code_executor.py # 代码执行 │ │ └── api_caller.py # API 调用 │ ├── api/ # API 接口 │ │ ├── __init__.py │ │ └── v1/ │ │ ├── agent.py # Agent API │ │ └── tools.py # 工具 API │ └── utils/ # 工具函数 ├── tests/ # 测试 ├── requirements.txt # 依赖 └── docker-compose.yml # Docker 配置
from typing import Optional, List, AsyncGenerator from pydantic import BaseModel, Field from .planner import Planner from .executor import Executor from .reflector import Reflector from ..perception.nlu import NLU from ..memory.short_term import ShortTermMemory from ..memory.long_term import LongTermMemory import uuid import time class AgentConfig(BaseModel): """Agent 配置""" llm_model: str = "gpt-4" max_iterations: int = 10 timeout: int = 300 enable_reflection: bool = True class AgentState(BaseModel): """Agent 执行状态""" session_id: str status: str = "idle" # idle, running, success, failed current_step: int = 0 total_steps: int = 0 result: Optional[str] = None error: Optional[str] = None class ManusAgent: """Manus 通用智能体""" def __init__(self, config: AgentConfig = None): self.config = config or AgentConfig() # 初始化核心组件 self.nlu = NLU(model=self.config.llm_model) self.planner = Planner(llm_model=self.config.llm_model) self.executor = Executor() self.reflector = Reflector(llm_model=self.config.llm_model) if self.config.enable_reflection else None # 初始化记忆系统 self.short_term_memory = ShortTermMemory() self.long_term_memory = LongTermMemory() # 会话状态 self.sessions: dict[str, AgentState] = {} async def run(self, user_input: str, session_id: Optional[str] = None) -> str: """执行完整任务流程""" session_id = session_id or str(uuid.uuid4()) start_time = time.time() # 初始化会话状态 state = AgentState(session_id=session_id, status="running") self.sessions[session_id] = state try: # 1. 感知:理解用户输入 perception = await self.nlu.understand(user_input, session_id) # 2. 规划:制定执行计划 plan = await self.planner.create_plan(perception) state.total_steps = len(plan.steps) # 3. 执行:调用工具完成任务 iteration = 0 while iteration < self.config.max_iterations: if time.time() - start_time > self.config.timeout: raise TimeoutError("Execution timeout") # 执行当前步骤 step_result = await self.executor.execute(plan.steps[iteration]) state.current_step = iteration + 1 # 检查是否需要调整计划 if step_result.requires_replan: plan = await self.planner.adjust_plan(plan, step_result) iteration += 1 # 检查是否完成 if self.is_task_complete(plan, step_result): break # 4. 反思:评估结果,提取经验 if self.reflector: reflection = await self.reflector.reflect(perception, plan, step_result) await self.long_term_memory.store(session_id, perception, plan, step_result, reflection) # 生成最终响应 response = await self.generate_response(perception, plan, step_result) state.status = "success" state.result = response return response except Exception as e: state.status = "failed" state.error = str(e) raise async def stream(self, user_input: str, session_id: Optional[str] = None) -> AsyncGenerator[str, None]: """流式执行,实时返回进度""" session_id = session_id or str(uuid.uuid4()) yield f"🤔 正在理解您的需求..." perception = await self.nlu.understand(user_input, session_id) yield f"📋 正在制定执行计划..." plan = await self.planner.create_plan(perception) yield f"📝 计划包含 {len(plan.steps)} 个步骤" for i, step in enumerate(plan.steps): yield f"🔧 执行步骤 {i+1}/{len(plan.steps)}: {step.description}" result = await self.executor.execute(step) yield f"✅ 步骤 {i+1} 完成" yield f"✨ 任务完成,正在生成结果..." response = await self.generate_response(perception, plan, result) yield response def is_task_complete(self, plan, result) -> bool: """判断任务是否完成""" return result.success and result.final_output is not None async def generate_response(self, perception, plan, result) -> str: """生成最终响应""" prompt = f"""基于以下执行结果,生成用户友好的响应: 用户意图:{perception.intent} 执行计划:{plan.summary()} 执行结果:{result.output} 请用简洁清晰的语言总结执行过程和结果。""" response = await self.nlu.llm.generate(prompt) return response
from pydantic_settings import BaseSettings from typing import List, Optional import os class Settings(BaseSettings): """应用配置""" # 应用配置 APP_NAME: str = "Manus Agent" APP_VERSION: str = "1.0.0" DEBUG: bool = False # LLM 配置 LLM_PROVIDER: str = "openai" LLM_MODEL: str = "gpt-4" LLM_TEMPERATURE: float = 0.7 LLM_MAX_TOKENS: int = 4096 OPENAI_API_KEY: str = "" ANTHROPIC_API_KEY: str = "" # Agent 配置 MAX_ITERATIONS: int = 10 EXECUTION_TIMEOUT: int = 300 ENABLE_REFLECTION: bool = True # 记忆系统配置 MEMORY_TYPE: str = "chroma" # chroma, pinecone, weaviate CHROMA_DB_PATH: str = "./data/chroma" PINECONE_API_KEY: str = "" # 沙箱配置 SANDBOX_TYPE: str = "docker" # docker, microvm, local SANDBOX_TIMEOUT: int = 60 SANDBOX_MEMORY_LIMIT: str = "512m" # 服务器配置 HOST: str = "0.0.0.0" PORT: int = 8000 # API 密钥 API_KEYS: List[str] = [] class Config: env_file = ".env" case_sensitive = True settings = Settings()
Manus 的规划引擎采用混合规划策略,结合多种先进的规划算法:
| 算法 | 描述 | 适用场景 |
|---|---|---|
| CoT (Chain of Thought) | 逐步推理,展示思考过程 | 逻辑推理、数学计算 |
| ToT (Tree of Thoughts) | 多路径探索,选择最优解 | 复杂决策、创意生成 |
| ReAct (Reasoning + Acting) | 推理与行动交替进行 | 工具使用、任务执行 |
| Plan-and-Solve | 先规划后执行 | 多步骤任务 |
from typing import List, Optional from pydantic import BaseModel from ..perception.nlu import Perception import json class PlanStep(BaseModel): """计划步骤""" id: int description: str tool: str params: dict dependencies: List[int] = [] status: str = "pending" # pending, running, success, failed class Plan(BaseModel): """执行计划""" steps: List[PlanStep] goal: str summary: str def __str__(self): return self.summary class Planner: """任务规划器""" def __init__(self, llm_model: str = "gpt-4"): self.llm_model = llm_model self.llm = self._init_llm() async def create_plan(self, perception: Perception) -> Plan: """创建执行计划""" prompt = self._build_plan_prompt(perception) # 调用 LLM 生成计划 response = await self.llm.generate(prompt) # 解析计划 plan_data = self._parse_plan_response(response) return Plan( steps=plan_data["steps"], goal=perception.intent, summary=plan_data["summary"] ) async def adjust_plan(self, current_plan: Plan, step_result) -> Plan: """根据执行结果调整计划""" prompt = f"""当前计划执行遇到以下情况: 原计划:{current_plan.summary} 执行结果:{step_result.output} 遇到的问题:{step_result.error} 请调整后续执行计划,解决问题并完成任务。 返回调整后的计划步骤。""" response = await self.llm.generate(prompt) adjusted_steps = self._parse_plan_response(response)["steps"] # 合并已完成步骤和调整后的步骤 completed_steps = [s for s in current_plan.steps if s.status == "success"] return Plan( steps=completed_steps + adjusted_steps, goal=current_plan.goal, summary=f"调整后的计划:{len(completed_steps) + len(adjusted_steps)} 步" ) def _build_plan_prompt(self, perception: Perception) -> str: """构建规划提示词""" return f"""你是一个专业的任务规划助手。请分析以下用户需求,制定详细的执行计划。 用户需求:{perception.intent} 上下文信息:{perception.context} 可用工具:web_search, code_executor, file_reader, api_caller, ... 请返回 JSON 格式的计划: {{ "steps": [ {{"id": 1, "description": "...", "tool": "...", "params": {{}}, "dependencies": []}}, ... ], "summary": "计划概述" }} 要求: 1. 步骤清晰,每步一个具体操作 2. 标明所需工具和参数 3. 标明步骤依赖关系 4. 考虑异常处理和回退方案""" def _parse_plan_response(self, response: str) -> dict: """解析 LLM 返回的计划""" try: # 提取 JSON 部分 json_start = response.find('{') json_end = response.rfind('}') + 1 json_str = response[json_start:json_end] return json.loads(json_str) except Exception as e: raise ValueError(f"Failed to parse plan: {e}")
# 用户输入 用户:"帮我分析最近特斯拉的股价走势,并生成一份报告" # 生成的计划 Plan( goal="分析特斯拉股价走势并生成报告", steps=[ PlanStep( id=1, description="搜索特斯拉最新股价信息", tool="web_search", params={"query": "Tesla stock price today"}, dependencies=[] ), PlanStep( id=2, description="获取特斯拉历史股价数据", tool="api_caller", params={"api": "yahoo_finance", "symbol": "TSLA", "period": "1y"}, dependencies=[1] ), PlanStep( id=3, description="分析股价趋势", tool="code_executor", params={"language": "python", "code": "..."}, dependencies=[2] ), PlanStep( id=4, description="生成分析报告", tool="code_executor", params={"language": "python", "code": "..."}, dependencies=[3] ) ], summary="4 步计划:搜索信息 → 获取数据 → 分析趋势 → 生成报告" )
Manus 的工具系统采用插件化架构,支持动态注册和发现工具。
from abc import ABC, abstractmethod from pydantic import BaseModel, Field from typing import Any, Dict, Type class ToolInput(BaseModel): """工具输入基类""" pass class ToolOutput(BaseModel): """工具输出基类""" success: bool data: Any = None error: str = None class BaseTool(ABC): """工具基类""" name: str description: str input_schema: Type[ToolInput] @abstractmethod async def execute(self, input: ToolInput) -> ToolOutput: """执行工具""" pass def to_schema(self) -> dict: """转换为 JSON Schema""" return { "name": self.name, "description": self.description, "parameters": self.input_schema.schema() }
from typing import Dict, Type, List from .base import BaseTool class ToolRegistry: """工具注册中心""" _instance = None _tools: Dict[str, BaseTool] = {} def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @classmethod def register(cls, tool: BaseTool): """注册工具""" cls._tools[tool.name] = tool @classmethod def get(cls, name: str) -> BaseTool: """获取工具""" if name not in cls._tools: raise KeyError(f"Tool not found: {name}") return cls._tools[name] @classmethod def list_tools(cls) -> List[dict]: """列出所有工具""" return [tool.to_schema() for tool in cls._tools.values()] @classmethod def clear(cls): """清空注册表""" cls._tools.clear() # 工具注册装饰器 def register_tool(tool_class): """工具注册装饰器""" tool = tool_class() ToolRegistry.register(tool) return tool_class
from .base import BaseTool, ToolInput, ToolOutput from pydantic import Field import httpx class WebSearchInput(ToolInput): query: str = Field(..., description="搜索查询") num_results: int = Field(default=5, description="结果数量") @register_tool class WebSearchTool(BaseTool): name = "web_search" description = "搜索网络获取最新信息" input_schema = WebSearchInput async def execute(self, input: WebSearchInput) -> ToolOutput: try: async with httpx.AsyncClient() as client: response = await client.get( "https://api.search.com/search", params={"q": input.query, "limit": input.num_results} ) results = response.json() return ToolOutput(success=True, data=results) except Exception as e: return ToolOutput(success=False, error=str(e))
from .base import BaseTool, ToolInput, ToolOutput from pydantic import Field from ..utils.sandbox import DockerSandbox class CodeExecutorInput(ToolInput): language: str = Field(..., description="编程语言") code: str = Field(..., description="代码内容") timeout: int = Field(default=60, description="超时时间 (秒)") @register_tool class CodeExecutorTool(BaseTool): name = "code_executor" description = "在安全沙箱中执行代码" input_schema = CodeExecutorInput async def execute(self, input: CodeExecutorInput) -> ToolOutput: sandbox = DockerSandbox() try: await sandbox.create() exit_code, stdout, stderr = await sandbox.execute(input.code, timeout=input.timeout) if exit_code == 0: return ToolOutput(success=True, data={"output": stdout}) else: return ToolOutput(success=False, error=stderr) except Exception as e: return ToolOutput(success=False, error=str(e)) finally: await sandbox.destroy()
Manus 的记忆系统分为短期记忆和长期记忆两层:
from typing import List, Optional from pydantic import BaseModel from collections import deque class Message(BaseModel): role: str # user, assistant, system content: str timestamp: float class ShortTermMemory: """短期记忆管理""" def __init__(self, max_length: int = 50): self.max_length = max_length self.sessions: dict[str, deque] = {} def add(self, session_id: str, message: Message): """添加消息""" if session_id not in self.sessions: self.sessions[session_id] = deque(maxlen=self.max_length) self.sessions[session_id].append(message) def get(self, session_id: str, limit: Optional[int] = None) -> List[Message]: """获取消息历史""" if session_id not in self.sessions: return [] messages = list(self.sessions[session_id]) if limit: return messages[-limit:] return messages def clear(self, session_id: str): """清除会话""" if session_id in self.sessions: del self.sessions[session_id]
from typing import List, Optional from .vector_store import VectorStore from ..perception.nlu import Perception from ..core.planner import Plan class MemoryRecord: """记忆记录""" def __init__(self, session_id, perception, plan, result, reflection): self.session_id = session_id self.perception = perception self.plan = plan self.result = result self.reflection = reflection self.embedding = None class LongTermMemory: """长期记忆管理""" def __init__(self, vector_store: VectorStore): self.vector_store = vector_store async def store(self, session_id, perception, plan, result, reflection): """存储记忆""" record = MemoryRecord(session_id, perception, plan, result, reflection) # 生成嵌入向量 text = f"{perception.intent} {plan.summary} {result.output}" record.embedding = await self.vector_store.embed(text) # 存储到向量数据库 await self.vector_store.add(record) async def retrieve(self, query: str, limit: int = 5) -> List[MemoryRecord]: """检索相关记忆""" query_embedding = await self.vector_store.embed(query) return await self.vector_store.search(query_embedding, limit=limit) async def get_similar_tasks(self, intent: str) -> List[MemoryRecord]: """获取相似任务的历史记录""" return await self.retrieve(intent, limit=10)
# LLM 配置 LLM_PROVIDER=openai LLM_MODEL=gpt-4 LLM_TEMPERATURE=0.7 OPENAI_API_KEY=sk-... # Agent 配置 MAX_ITERATIONS=10 EXECUTION_TIMEOUT=300 ENABLE_REFLECTION=true # 记忆系统 MEMORY_TYPE=chroma CHROMA_DB_PATH=./data/chroma # 沙箱配置 SANDBOX_TYPE=docker SANDBOX_TIMEOUT=60 SANDBOX_MEMORY_LIMIT=512m # 服务器配置 HOST=0.0.0.0 PORT=8000 DEBUG=false
# 安装依赖 $ pip install -r requirements.txt # 启动服务 $ python -m app.main # API 调用示例 $ curl -X POST http://localhost:8000/api/v1/agent/run \ -H "Content-Type: application/json" \ -H "Authorization: Bearer your-api-key" \ -d '{ "message": "帮我分析特斯拉最近一年的股价走势,生成一份报告", "stream": false }' # 流式调用 $ curl -X POST http://localhost:8000/api/v1/agent/run \ -H "Content-Type: application/json" \ -d '{"message": "...", "stream": true}'
🤖 Manus Agent 启动...
📝 收到用户请求:分析特斯拉股价走势
🧠 理解意图...
- 意图:stock_analysis
- 实体:{"company": "Tesla", "symbol": "TSLA", "period": "1y"}
- 任务类型:数据分析 + 报告生成
📋 制定计划...
步骤 1/4: 搜索特斯拉最新股价信息 [web_search]
步骤 2/4: 获取历史股价数据 [api_caller]
步骤 3/4: 分析股价趋势 [code_executor]
步骤 4/4: 生成分析报告 [code_executor]
🔧 执行步骤 1/4...
✅ 搜索完成,找到 5 条相关信息
🔧 执行步骤 2/4...
✅ 获取 1 年历史数据,共 252 个交易日
🔧 执行步骤 3/4...
✅ 趋势分析完成
- 最高价:$299.29
- 最低价:$101.81
- 涨跌幅:+101.5%
🔧 执行步骤 4/4...
✅ 报告生成完成
💡 反思执行过程...
- 执行成功
- 总耗时:45 秒
- 工具调用:4 次
✨ 任务完成!
📊 分析报告:
特斯拉 (TSLA) 在过去一年表现强劲...
[完整报告内容]
version: '3.8' services: manus-agent: build: . ports: - "8000:8000" environment: - OPENAI_API_KEY=${OPENAI_API_KEY} - CHROMA_DB_PATH=/data/chroma volumes: - ./data:/data depends_on: - chromadb - redis restart: unless-stopped chromadb: image: chromadb/chroma:latest volumes: - ./data/chroma:/chroma/chroma redis: image: redis:7-alpine volumes: - ./data/redis:/data
| 优化项 | 措施 | 效果 |
|---|---|---|
| LLM 调用 | 响应缓存、批量处理 | 减少 50% API 调用 |
| 向量检索 | HNSW 索引、近似搜索 | 10 倍检索速度提升 |
| 代码执行 | 容器池、预热沙箱 | 启动时间减少 80% |
| 并发处理 | 异步 IO、任务队列 | 支持 100+ 并发 |
"Manus 通用智能体代表了 AI Agent 技术的未来方向, 从任务特定型向通用型演进,从预定义流程向自主规划演进。 构建这样的系统需要深厚的技术积累和持续的迭代优化, 但其带来的价值将是革命性的。"