HTN 规划器实现
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from enum import Enum
class TaskType(Enum):
COMPOUND = "compound" # 复合任务(需分解)
PRIMITIVE = "primitive" # 原语任务(可执行)
@dataclass
class Task:
"""任务定义"""
name: str
task_type: TaskType
parameters: List[str]
preconditions: List[str] # 前置条件
effects: List[str] # 效果
@dataclass
class Method:
"""分解方法"""
name: str
task_name: str # 要分解的任务名
preconditions: List[str] # 方法适用条件
subtasks: List[Task] # 分解后的子任务
constraints: List[str] # 子任务间约束
@dataclass
class Operator:
"""原语操作符"""
name: str
parameters: List[str]
preconditions: List[str]
effects: List[str]
execute_fn: Callable # 执行函数
class HTNPlanner:
"""
HTN 规划器
支持任务分解、方法选择、约束检查
"""
def __init__(self):
"""初始化规划器"""
self.methods: Dict[str, List[Method]] = {} # task_name -> [methods]
self.operators: Dict[str, Operator] = {}
self.world_state: Dict[str, bool] = {}
def add_method(self, method: Method):
"""添加分解方法"""
if method.task_name not in self.methods:
self.methods[method.task_name] = []
self.methods[method.task_name].append(method)
print(f"✓ 添加方法:{method.name} (分解 {method.task_name})")
def add_operator(self, operator: Operator):
"""添加原语操作符"""
self.operators[operator.name] = operator
print(f"✓ 添加操作符:{operator.name}")
def check_preconditions(self, preconditions: List[str]) -> bool:
"""检查前置条件是否满足"""
for precondition in preconditions:
if precondition.startswith("not "):
cond = precondition[4:]
if self.world_state.get(cond, False):
return False
else:
if not self.world_state.get(precondition, False):
return False
return True
def decompose(self, task: Task, depth: int = 0) -> Optional[List[Task]]:
"""
分解任务
Args:
task: 待分解任务
depth: 当前深度
Returns:
subtasks: 分解后的子任务列表,或 None(无法分解)
"""
indent = " " * depth
if task.task_type == TaskType.PRIMITIVE:
# 原语任务,无需分解
print(f"{indent}✓ 原语任务:{task.name}")
return [task]
print(f"{indent}分解复合任务:{task.name}({', '.join(task.parameters)})")
# 检查任务前置条件
if not self.check_preconditions(task.preconditions):
print(f"{indent}✗ 前置条件不满足")
return None
# 查找适用的方法
if task.task_name not in self.methods:
print(f"{indent}✗ 无可用分解方法")
return None
applicable_methods = []
for method in self.methods[task.task_name]:
if self.check_preconditions(method.preconditions):
applicable_methods.append(method)
if not applicable_methods:
print(f"{indent}✗ 无适用方法(前置条件不满足)")
return None
# 尝试每个方法(简单策略:选择第一个)
for method in applicable_methods:
print(f"{indent}尝试方法:{method.name}")
# 递归分解子任务
all_subtasks = []
success = True
for subtask in method.subtasks:
decomposed = self.decompose(subtask, depth + 1)
if decomposed is None:
success = False
break
all_subtasks.extend(decomposed)
if success:
print(f"{indent}✓ 方法 {method.name} 成功")
return all_subtasks
print(f"{indent}✗ 所有方法都失败")
return None
def plan(self, initial_state: Dict[str, bool],
tasks: List[Task]) -> Optional[List[Task]]:
"""
生成规划
Args:
initial_state: 初始世界状态
tasks: 顶层任务列表
Returns:
plan: 原语任务序列,或 None(无解)
"""
# 设置初始状态
self.world_state = initial_state.copy()
print("开始 HTN 规划")
print("="*70 + "\n")
print(f"初始状态:{self.world_state}")
print(f"目标任务:{[t.name for t in tasks]}\n")
# 分解所有任务
plan = []
for task in tasks:
decomposed = self.decompose(task)
if decomposed is None:
print(f"\n规划失败:无法分解任务 {task.name}")
return None
plan.extend(decomposed)
print("\n" + "="*70)
print(f"\n规划成功!共{len(plan)}个原语动作:")
for i, task in enumerate(plan, 1):
print(f" {i}. {task.name}({', '.join(task.parameters)})")
return plan
# 使用示例:旅行规划
if __name__ == "__main__":
planner = HTNPlanner()
# 定义原语操作符
book_flight = Operator(
name="book_flight",
parameters=["from", "to"],
preconditions=["has_money"],
effects=["flight_booked"],
execute_fn=lambda: print(" 执行:预订机票")
)
book_hotel = Operator(
name="book_hotel",
parameters=["location"],
preconditions=["has_money"],
effects=["hotel_booked"],
execute_fn=lambda: print(" 执行:预订酒店")
)
pack_bags = Operator(
name="pack_bags",
parameters=[],
preconditions=[],
effects=["bags_packed"],
execute_fn=lambda: print(" 执行:收拾行李")
)
planner.add_operator(book_flight)
planner.add_operator(book_hotel)
planner.add_operator(pack_bags)
print("\n" + "="*70 + "\n")
# 定义分解方法
# 方法 1: 旅行 -> 订机票 + 订酒店 + 收拾行李
travel_method1 = Method(
name="travel_by_air",
task_name="travel",
preconditions=["far_distance"],
subtasks=[
Task("book_flight", TaskType.PRIMITIVE, ["from", "to"], [], []),
Task("book_hotel", TaskType.PRIMITIVE, ["to"], [], []),
Task("pack_bags", TaskType.PRIMITIVE, [], [], [])
],
constraints=["book_flight before pack_bags"]
)
# 方法 2: 旅行 -> 订酒店 + 收拾行李(短途)
travel_method2 = Method(
name="travel_by_car",
task_name="travel",
preconditions=["near_distance"],
subtasks=[
Task("book_hotel", TaskType.PRIMITIVE, ["to"], [], []),
Task("pack_bags", TaskType.PRIMITIVE, [], [], [])
],
constraints=[]
)
planner.add_method(travel_method1)
planner.add_method(travel_method2)
print("\n" + "="*70 + "\n")
# 规划示例 1: 长途旅行
print("【示例 1】长途旅行规划:")
print("-"*70)
initial_state = {
"has_money": True,
"far_distance": True,
"near_distance": False
}
travel_task = Task(
name="travel",
task_type=TaskType.COMPOUND,
parameters=["Beijing", "Paris"],
preconditions=[],
effects=[]
)
plan1 = planner.plan(initial_state, [travel_task])
print("\n" + "="*70 + "\n")
# 规划示例 2: 短途旅行
print("【示例 2】短途旅行规划:")
print("-"*70)
initial_state2 = {
"has_money": True,
"far_distance": False,
"near_distance": True
}
plan2 = planner.plan(initial_state2, [travel_task])