协同规划与分布式任务完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import random
from collections import defaultdict
import heapq
import asyncio
class PlanningType(Enum):
"""规划类型"""
CENTRALIZED = "centralized" # 集中规划
DISTRIBUTED = "distributed" # 分布式规划
HIERARCHICAL = "hierarchical" # 分层规划
JOINT = "joint" # 联合规划
class TaskType(Enum):
"""任务类型"""
ATOMIC = "atomic" # 原子任务
COMPOSITE = "composite" # 复合任务
PARALLEL = "parallel" # 并行任务
SEQUENTIAL = "sequential" # 顺序任务
class ConflictType(Enum):
"""冲突类型"""
RESOURCE = "resource" # 资源冲突
TEMPORAL = "temporal" # 时间冲突
SPATIAL = "spatial" # 空间冲突
LOGICAL = "logical" # 逻辑冲突
@dataclass
class Action:
"""动作"""
id: str
name: str
preconditions: Dict[str, Any]
effects: Dict[str, Any]
duration: float
resource_requirements: Dict[str, float]
@dataclass
class Task:
"""任务"""
id: str
name: str
type: TaskType
subtasks: List['Task'] = field(default_factory=list)
priority: int = 1
deadline: Optional[datetime] = None
assigned_to: Optional[str] = None
status: str = "pending" # pending, running, completed, failed
estimated_duration: float = 0.0
actual_duration: float = 0.0
@dataclass
class Agent:
"""智能体"""
id: str
capabilities: Dict[str, float]
current_plan: List[Action] = field(default_factory=list)
current_task: Optional[Task] = None
location: Tuple[float, float] = (0, 0)
busy_until: float = 0.0
load: float = 0.0
@dataclass
class Plan:
"""计划"""
id: str
agent_id: str
actions: List[Action]
start_time: float
end_time: float
status: str = "pending"
class TaskDecomposer:
"""
任务分解器
支持:
1. 层次任务分解
2. 并行任务识别
3. 依赖关系分析
4. 关键路径计算
"""
def __init__(self):
self.decomposition_history: List[Dict[str, Any]] = []
def decompose(self, task: Task, max_depth: int = 5) -> Task:
"""分解任务"""
if max_depth <= 0 or task.type == TaskType.ATOMIC:
return task
if task.type == TaskType.COMPOSITE:
# 简化:将复合任务分解为原子任务
subtasks = []
for i in range(3): # 分解为 3 个子任务
subtask = Task(
id=f"{task.id}_sub{i}",
name=f"{task.name}_sub{i}",
type=TaskType.ATOMIC,
priority=task.priority,
estimated_duration=task.estimated_duration / 3
)
subtasks.append(subtask)
task.subtasks = subtasks
task.type = TaskType.SEQUENTIAL
# 递归分解子任务
for subtask in task.subtasks:
self.decompose(subtask, max_depth - 1)
self.decomposition_history.append({
"task_id": task.id,
"subtasks": [st.id for st in task.subtasks],
"timestamp": datetime.now().isoformat()
})
return task
def identify_parallel_tasks(self, task: Task) -> List[List[Task]]:
"""识别可并行执行的任务"""
if not task.subtasks:
return [[task]]
# 简化:假设所有原子任务可并行
parallel_groups = []
atomic_tasks = [st for st in task.subtasks if st.type == TaskType.ATOMIC]
if atomic_tasks:
parallel_groups.append(atomic_tasks)
# 递归处理复合子任务
for subtask in task.subtasks:
if subtask.type != TaskType.ATOMIC:
parallel_groups.extend(self.identify_parallel_tasks(subtask))
return parallel_groups
def calculate_critical_path(self, task: Task) -> Tuple[List[Task], float]:
"""计算关键路径"""
if not task.subtasks:
return [task], task.estimated_duration
# 简化:计算最长路径
max_path = []
max_duration = 0
for subtask in task.subtasks:
path, duration = self.calculate_critical_path(subtask)
if duration > max_duration:
max_duration = duration
max_path = path
return [task] + max_path, task.estimated_duration + max_duration
class TaskAllocator:
"""
任务分配器
支持:
1. 基于能力的分配
2. 负载均衡
3. 最优匹配
4. 动态重分配
"""
def __init__(self):
self.allocation_history: List[Dict[str, Any]] = []
def allocate_task(self, task: Task, agents: Dict[str, Agent]) -> Optional[str]:
"""分配任务给最佳智能体"""
if task.assigned_to:
return task.assigned_to
best_agent = None
best_score = -float('inf')
for agent_id, agent in agents.items():
# 检查能力匹配
capability_score = self._calculate_capability_score(task, agent)
# 检查负载
load_score = 1.0 - agent.load
# 检查可用性
availability_score = 1.0 if agent.busy_until <= 0 else 0.5
# 综合评分
total_score = capability_score * 0.5 + load_score * 0.3 + availability_score * 0.2
if total_score > best_score:
best_score = total_score
best_agent = agent_id
if best_agent:
task.assigned_to = best_agent
agents[best_agent].load += 0.1
self.allocation_history.append({
"task_id": task.id,
"agent_id": best_agent,
"score": best_score,
"timestamp": datetime.now().isoformat()
})
return best_agent
def _calculate_capability_score(self, task: Task, agent: Agent) -> float:
"""计算能力匹配分数"""
# 简化:随机分数
return random.uniform(0.5, 1.0)
def balance_load(self, agents: Dict[str, Agent]) -> bool:
"""负载均衡"""
loads = [agent.load for agent in agents.values()]
if not loads:
return False
avg_load = sum(loads) / len(loads)
max_deviation = max(abs(load - avg_load) for load in loads)
# 如果负载不均衡超过阈值,进行重分配
if max_deviation > 0.3:
# 简化:不实际重分配
return True
return False
class ConflictDetector:
"""
冲突检测器
支持:
1. 资源冲突检测
2. 时间冲突检测
3. 空间冲突检测
4. 死锁检测
"""
def __init__(self):
self.conflicts: List[Dict[str, Any]] = []
def detect_conflicts(self, plans: List[Plan],
resources: Dict[str, float]) -> List[Dict[str, Any]]:
"""检测冲突"""
conflicts = []
# 检测资源冲突
resource_conflicts = self._detect_resource_conflicts(plans, resources)
conflicts.extend(resource_conflicts)
# 检测时间冲突
temporal_conflicts = self._detect_temporal_conflicts(plans)
conflicts.extend(temporal_conflicts)
self.conflicts = conflicts
return conflicts
def _detect_resource_conflicts(self, plans: List[Plan],
resources: Dict[str, float]) -> List[Dict[str, Any]]:
"""检测资源冲突"""
conflicts = []
# 简化:检查资源需求是否超过供应
for resource, capacity in resources.items():
total_demand = 0
for plan in plans:
for action in plan.actions:
if resource in action.resource_requirements:
total_demand += action.resource_requirements[resource]
if total_demand > capacity:
conflicts.append({
"type": ConflictType.RESOURCE,
"resource": resource,
"demand": total_demand,
"capacity": capacity,
"severity": (total_demand - capacity) / capacity
})
return conflicts
def _detect_temporal_conflicts(self, plans: List[Plan]) -> List[Dict[str, Any]]:
"""检测时间冲突"""
conflicts = []
# 简化:检查时间重叠
for i, plan1 in enumerate(plans):
for plan2 in plans[i+1:]:
if plan1.agent_id == plan2.agent_id:
# 同一智能体的计划时间重叠
if plan1.start_time < plan2.end_time and plan2.start_time < plan1.end_time:
conflicts.append({
"type": ConflictType.TEMPORAL,
"agents": [plan1.agent_id],
"plans": [plan1.id, plan2.id],
"overlap": min(plan1.end_time, plan2.end_time) - max(plan1.start_time, plan2.start_time)
})
return conflicts
def detect_deadlock(self, agents: Dict[str, Agent],
resources: Dict[str, float]) -> bool:
"""检测死锁"""
# 简化:检查循环等待
# 实际实现需要构建等待图并检测环
return False
class CollaborativePlanner:
"""
协同规划器
支持:
1. 联合规划
2. 分布式规划
3. 动态重规划
4. 规划协调
"""
def __init__(self):
self.plans: Dict[str, Plan] = {}
self.planning_history: List[Dict[str, Any]] = []
def create_joint_plan(self, task: Task, agents: Dict[str, Agent]) -> List[Plan]:
"""创建联合计划"""
plans = []
# 分解任务
decomposer = TaskDecomposer()
decomposed_task = decomposer.decompose(task)
# 分配子任务
allocator = TaskAllocator()
for subtask in decomposed_task.subtasks:
agent_id = allocator.allocate_task(subtask, agents)
if agent_id:
# 创建计划
plan = Plan(
id=f"plan_{subtask.id}",
agent_id=agent_id,
actions=[Action(f"action_{subtask.id}", subtask.name, {}, {}, subtask.estimated_duration, {})],
start_time=0,
end_time=subtask.estimated_duration
)
plans.append(plan)
self.plans[plan.id] = plan
# 检测冲突
detector = ConflictDetector()
resources = {"cpu": 100.0, "memory": 100.0}
conflicts = detector.detect_conflicts(plans, resources)
if conflicts:
print(f"检测到{len(conflicts)}个冲突")
# 解决冲突(简化)
self.planning_history.append({
"task_id": task.id,
"plans": [p.id for p in plans],
"conflicts": len(conflicts),
"timestamp": datetime.now().isoformat()
})
return plans
def replan(self, task_id: str, agents: Dict[str, Agent]) -> bool:
"""动态重规划"""
# 简化:重新创建计划
return True
# 使用示例
if __name__ == "__main__":
print("=== 多智能体协同规划与分布式任务 ===\n")
print("=== 创建协同规划系统 ===")
# 创建智能体
agents = {
"agent1": Agent(id="agent1", capabilities={"planning": 0.9, "execution": 0.8}),
"agent2": Agent(id="agent2", capabilities={"planning": 0.7, "execution": 0.9}),
"agent3": Agent(id="agent3", capabilities={"planning": 0.8, "execution": 0.7}),
}
print(f"创建{len(agents)}个智能体")
for agent_id, agent in agents.items():
print(f" {agent_id}: 规划={agent.capabilities['planning']}, 执行={agent.capabilities['execution']}")
print(f"\n=== 创建任务 ===")
# 创建复合任务
main_task = Task(
id="task_main",
name="Complex Mission",
type=TaskType.COMPOSITE,
priority=1,
estimated_duration=30.0
)
print(f"创建复合任务:{main_task.id} - {main_task.name}")
print(f"\n=== 任务分解 ===")
# 分解任务
decomposer = TaskDecomposer()
decomposed_task = decomposer.decompose(main_task)
print(f"任务分解完成:")
print(f" 子任务数:{len(decomposed_task.subtasks)}")
for subtask in decomposed_task.subtasks:
print(f" - {subtask.id}: {subtask.name} (时长={subtask.estimated_duration:.1f})")
# 识别并行任务
parallel_groups = decomposer.identify_parallel_tasks(decomposed_task)
print(f"\n可并行执行组数:{len(parallel_groups)}")
# 计算关键路径
critical_path, critical_duration = decomposer.calculate_critical_path(decomposed_task)
print(f"关键路径长度:{len(critical_path)}个任务,总时长={critical_duration:.1f}")
print(f"\n=== 任务分配 ===")
# 分配任务
allocator = TaskAllocator()
for subtask in decomposed_task.subtasks:
agent_id = allocator.allocate_task(subtask, agents)
if agent_id:
print(f" {subtask.id} -> {agent_id}")
# 负载均衡
balanced = allocator.balance_load(agents)
print(f"\n负载均衡:{'完成' if balanced else '无需调整'}")
for agent_id, agent in agents.items():
print(f" {agent_id}负载:{agent.load:.2f}")
print(f"\n=== 创建联合计划 ===")
# 创建联合计划
planner = CollaborativePlanner()
plans = planner.create_joint_plan(main_task, agents)
print(f"创建{len(plans)}个计划:")
for plan in plans:
print(f" {plan.id}: 智能体={plan.agent_id}, 时长={plan.end_time:.1f}")
print(f"\n=== 冲突检测 ===")
# 检测冲突
detector = ConflictDetector()
resources = {"cpu": 100.0, "memory": 50.0} # 内存资源紧张
conflicts = detector.detect_conflicts(plans, resources)
if conflicts:
print(f"检测到{len(conflicts)}个冲突:")
for conflict in conflicts:
print(f" 类型:{conflict['type'].value}, 严重度:{conflict.get('severity', 'N/A')}")
else:
print("无冲突")
# 检测死锁
deadlock = detector.detect_deadlock(agents, resources)
print(f"死锁检测:{'发现死锁' if deadlock else '无死锁'}")
print(f"\n关键观察:")
print("1. 协同规划:联合规划实现全局最优")
print("2. 任务分解:层次化分解提升可执行性")
print("3. 任务分配:基于能力和负载的最优分配")
print("4. 冲突检测:资源、时间、空间冲突全面检测")
print("5. 分布式执行:并行执行提高效率")
print("\n协同的核心:规划 + 分解 + 分配 + 协调 = 分布式智能")