增量式重规划 Agent 完整实现
import openai
from typing import List, Dict, Optional, Set, Tuple
from dataclasses import dataclass
from enum import Enum
import json
import heapq
class PlanStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
BLOCKED = "blocked"
FAILED = "failed"
@dataclass
class Subgoal:
"""子目标"""
id: str
description: str
dependencies: List[str] # 依赖的子目标 ID
status: PlanStatus = PlanStatus.PENDING
result: Optional[str] = None
cost: float = 0.0
@dataclass
class Plan:
"""计划"""
goal: str
subgoals: List[Subgoal]
execution_order: List[str] # 子目标 ID 的执行顺序
total_cost: float = 0.0
class IncrementalReplanner:
"""
增量式重规划器
基于现有计划进行局部调整,仅重规划受影响区域
"""
def __init__(self, model: str = "gpt-4",
max_replan_attempts: int = 3):
"""
初始化
Args:
model: LLM 模型
max_replan_attempts: 最大重规划尝试次数
"""
self.model = model
self.max_replan_attempts = max_replan_attempts
self.current_plan: Optional[Plan] = None
self.execution_history: List[Dict] = []
self.obstacles: Set[str] = set() # 障碍/约束
def _call_llm(self, prompt: str, temperature: float = 0.7) -> str:
"""调用 LLM"""
response = openai.ChatCompletion.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
temperature=temperature
)
return response.choices[0].message.content
def create_initial_plan(self, goal: str, context: str = "") -> Plan:
"""创建初始计划"""
prompt = f"""
目标:{goal}
上下文:{context}
请将这个长程目标分解为可执行的子目标。
输出格式(JSON):
{{
"subgoals": [
{{
"id": "sub_1",
"description": "子目标描述",
"dependencies": [], // 依赖的子目标 ID 列表
"estimated_cost": 1.0 // 预估成本/难度
}}
],
"execution_order": ["sub_1", "sub_2", ...] // 推荐的执行顺序
}}
考虑:
1. 子目标之间的依赖关系
2. 执行顺序的合理性
3. 每个子目标的可执行性
"""
response_text = self._call_llm(prompt, temperature=0.7)
plan_data = json.loads(response_text)
subgoals = []
for sg_data in plan_data["subgoals"]:
subgoals.append(Subgoal(
id=sg_data["id"],
description=sg_data["description"],
dependencies=sg_data["dependencies"],
cost=sg_data.get("estimated_cost", 1.0)
))
plan = Plan(
goal=goal,
subgoals=subgoals,
execution_order=plan_data["execution_order"],
total_cost=sum(sg.cost for sg in subgoals)
)
self.current_plan = plan
return plan
def detect_changes(self, execution_result: Dict) -> Set[str]:
"""
检测环境变化
Args:
execution_result: 执行结果(包含成功/失败、障碍信息等)
Returns:
changes: 检测到的变化集合
"""
changes = set()
if execution_result["status"] == "failed":
# 识别失败原因(新障碍)
obstacle = execution_result.get("obstacle", "unknown")
changes.add(f"obstacle:{obstacle}")
self.obstacles.add(obstacle)
# 检测约束变化
if "new_constraints" in execution_result:
for constraint in execution_result["new_constraints"]:
changes.add(f"constraint:{constraint}")
return changes
def identify_affected_subgoals(self, changes: Set[str]) -> Set[str]:
"""
识别受变化影响的子目标
Args:
changes: 变化集合
Returns:
affected_ids: 受影响的子目标 ID 集合
"""
affected = set()
if not self.current_plan:
return affected
# 简单启发式:检查子目标描述是否包含障碍关键词
for subgoal in self.current_plan.subgoals:
for change in changes:
if change in subgoal.description or any(
obs in subgoal.description for obs in self.obstacles
):
affected.add(subgoal.id)
break
# 检查依赖是否受影响
for dep_id in subgoal.dependencies:
if dep_id in affected:
affected.add(subgoal.id)
break
return affected
def incremental_replan(self, affected_ids: Set[str],
execution_context: Dict) -> Plan:
"""
增量式重规划
Args:
affected_ids: 受影响的子目标 ID
execution_context: 执行上下文(已完成的结果等)
Returns:
new_plan: 调整后的计划
"""
if not self.current_plan:
raise ValueError("No current plan to replan")
print(f"开始增量式重规划,影响 {len(affected_ids)} 个子目标")
# 1. 保留未受影响的子目标
unchanged_subgoals = [
sg for sg in self.current_plan.subgoals
if sg.id not in affected_ids
]
# 2. 获取已完成的子目标结果
completed_results = {
item["subgoal_id"]: item["result"]
for item in self.execution_history
if item["status"] == "completed"
}
# 3. 为重规划区域生成新子目标
affected_subgoals = [
sg for sg in self.current_plan.subgoals
if sg.id in affected_ids
]
# 构建重规划提示
affected_desc = "\n".join([
f"- {sg.id}: {sg.description}"
for sg in affected_subgoals
])
obstacles_text = "\n".join([
f"- {obs}" for obs in self.obstacles
])
prompt = f"""
原目标:{self.current_plan.goal}
受影响的子目标:
{affected_desc}
当前障碍/约束:
{obstacles_text}
已完成子目标的结果:
{json.dumps(completed_results, indent=2)}
请为重规划区域生成新的子目标方案。
输出格式(JSON):
{{
"new_subgoals": [
{{
"id": "new_sub_1",
"description": "新子目标描述(避开障碍)",
"dependencies": [], // 依赖关系
"estimated_cost": 1.0
}}
],
"execution_order": ["new_sub_1", ...],
"integration_strategy": "如何与未受影响的子目标整合"
}}
"""
response_text = self._call_llm(prompt, temperature=0.7)
replan_data = json.loads(response_text)
# 4. 创建新子目标
new_subgoals = []
for sg_data in replan_data["new_subgoals"]:
new_subgoals.append(Subgoal(
id=sg_data["id"],
description=sg_data["description"],
dependencies=sg_data["dependencies"],
cost=sg_data.get("estimated_cost", 1.0)
))
# 5. 整合计划
all_subgoals = unchanged_subgoals + new_subgoals
# 更新执行顺序
new_execution_order = replan_data["execution_order"]
new_plan = Plan(
goal=self.current_plan.goal,
subgoals=all_subgoals,
execution_order=new_execution_order,
total_cost=sum(sg.cost for sg in all_subgoals)
)
self.current_plan = new_plan
return new_plan
def execute_with_replanning(self, goal: str, context: str = "") -> Dict:
"""
执行计划并支持动态重规划
Args:
goal: 总目标
context: 上下文信息
Returns:
result: 执行结果
"""
print(f"开始长程任务:{goal[:50]}...")
print("="*70 + "\n")
# 创建初始计划
plan = self.create_initial_plan(goal, context)
print(f"初始计划:{len(plan.subgoals)} 个子目标")
print(f"执行顺序:{' → '.join(plan.execution_order)}")
print()
replan_count = 0
for step, subgoal_id in enumerate(plan.execution_order, 1):
print(f"步骤 {step}/{len(plan.execution_order)}: 执行 {subgoal_id}")
# 获取子目标
subgoal = next(
(sg for sg in plan.subgoals if sg.id == subgoal_id),
None
)
if not subgoal:
print(f"⚠️ 子目标 {subgoal_id} 不存在,跳过")
continue
# 模拟执行(实际应调用工具/API)
execution_result = self._execute_subgoal(subgoal)
# 记录历史
self.execution_history.append({
"step": step,
"subgoal_id": subgoal_id,
"status": execution_result["status"],
"result": execution_result.get("result"),
"obstacle": execution_result.get("obstacle")
})
# 检查是否需要重规划
if execution_result["status"] == "failed":
print(f"✗ 执行失败:{execution_result.get('reason', '未知原因')}")
# 检测变化
changes = self.detect_changes(execution_result)
print(f"检测到变化:{changes}")
# 识别受影响区域
affected = self.identify_affected_subgoals(changes)
print(f"影响 {len(affected)} 个子目标:{affected}")
if affected and replan_count < self.max_replan_attempts:
# 增量式重规划
print(f"\n触发增量式重规划 (尝试 {replan_count + 1}/{self.max_replan_attempts})")
new_plan = self.incremental_replan(affected, {
"completed": {
item["subgoal_id"]: item["result"]
for item in self.execution_history
if item["status"] == "completed"
}
})
plan = new_plan
replan_count += 1
print(f"✓ 重规划完成,新计划:{len(plan.subgoals)} 个子目标")
print()
else:
if replan_count >= self.max_replan_attempts:
print("⚠️ 达到最大重规划次数,任务失败")
return {
"status": "failed",
"reason": "max_replan_attempts_exceeded",
"progress": step / len(plan.execution_order)
}
else:
print(f"✓ 执行成功")
subgoal.status = PlanStatus.COMPLETED
subgoal.result = execution_result.get("result")
print()
print("="*70)
print(f"任务完成,重规划 {replan_count} 次")
return {
"status": "completed",
"goal": goal,
"replan_count": replan_count,
"execution_history": self.execution_history
}
def _execute_subgoal(self, subgoal: Subgoal) -> Dict:
"""
执行子目标(模拟)
实际应用中应调用具体工具/API
"""
# 模拟执行逻辑
import random
# 80% 成功率
if random.random() < 0.8:
return {
"status": "completed",
"result": f"完成:{subgoal.description}"
}
else:
obstacles = ["资源不足", "依赖缺失", "环境变化", "约束冲突"]
return {
"status": "failed",
"reason": random.choice(obstacles),
"obstacle": random.choice(obstacles)
}
# 使用示例
if __name__ == "__main__":
# 初始化重规划器
replanner = IncrementalReplanner(max_replan_attempts=3)
# 示例任务:软件开发项目
goal = "开发一个完整的 Web 应用,包括前端、后端、数据库和部署"
context = """
技术栈:React + Node.js + PostgreSQL
团队规模:3 人
时间限制:2 个月
预算:中等
"""
print("增量式重规划示例:软件开发项目")
print("="*70 + "\n")
print(f"目标:{goal}\n")
# 执行并支持重规划
result = replanner.execute_with_replanning(goal, context)
print("\n最终结果:")
print("-"*70)
print(f"状态:{result['status']}")
print(f"重规划次数:{result.get('replan_count', 0)}")
print(f"执行步骤:{len(result.get('execution_history', []))}")
print("-"*70)
print("\n关键观察:")
print("1. 增量式重规划仅调整受影响区域(高效)")
print("2. 复用未受影响的计划部分(节省计算)")
print("3. 支持多次重规划尝试(鲁棒性)")
print("4. 实时响应环境变化(灵活性)")
print("5. 适用于长程复杂任务(实用性)")