多智能体系统完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import hashlib
import json
from collections import defaultdict, deque
import heapq
import random
class AgentType(Enum):
"""智能体类型"""
REACTIVE = "reactive" # 反应式
COGNITIVE = "cognitive" # 认知式
HYBRID = "hybrid" # 混合式
SOCIAL = "social" # 社会式
class CommunicationProtocol(Enum):
"""通信协议"""
FIPA_ACL = "fipa_acl" # FIPA ACL
KQML = "kqml" # KQML
CUSTOM = "custom" # 自定义
class InteractionType(Enum):
"""交互类型"""
COOPERATION = "cooperation" # 合作
COMPETITION = "competition" # 竞争
NEGOTIATION = "negotiation" # 协商
COORDINATION = "coordination" # 协调
@dataclass
class Message:
"""消息"""
id: str
sender_id: str
receiver_id: str
content: Any
protocol: CommunicationProtocol
timestamp: datetime = field(default_factory=datetime.now)
in_reply_to: str = None
@dataclass
class Task:
"""任务"""
id: str
description: str
difficulty: float
reward: float
deadline: datetime
assigned_to: str = None
status: str = "pending"
@dataclass
class AgentState:
"""智能体状态"""
id: str
agent_type: AgentType
capabilities: Dict[str, float]
beliefs: Dict[str, Any]
goals: List[str]
plans: List[Dict[str, Any]]
messages: List[Message] = field(default_factory=list)
utility: float = 0.0
class Environment:
"""
环境
支持:
1. 状态管理
2. 感知接口
3. 行动执行
4. 动态变化
"""
def __init__(self, grid_size: int = 10):
self.grid_size = grid_size
self.state: Dict[str, Any] = {}
self.agents: Dict[str, AgentState] = {}
self.objects: Dict[str, Any] = {}
self.time_step: int = 0
def register_agent(self, agent: AgentState):
"""注册智能体"""
self.agents[agent.id] = agent
def perceive(self, agent_id: str) -> Dict[str, Any]:
"""智能体感知环境"""
if agent_id not in self.agents:
return {}
# 简化:返回全局状态(实际应返回局部感知)
return {
"time_step": self.time_step,
"agents": {aid: {"capabilities": a.capabilities}
for aid, a in self.agents.items() if aid != agent_id},
"objects": self.objects,
"state": self.state
}
def execute_action(self, agent_id: str, action: Dict[str, Any]) -> bool:
"""执行智能体行动"""
if agent_id not in self.agents:
return False
# 简化:更新环境状态
action_type = action.get("type")
if action_type == "move":
# 移动行动
pass
elif action_type == "communicate":
# 通信行动
pass
elif action_type == "modify":
# 修改环境
key = action.get("key")
value = action.get("value")
if key:
self.state[key] = value
return True
def step(self):
"""环境时间步推进"""
self.time_step += 1
# 环境动态变化
# 简化:随机变化
if random.random() < 0.3:
key = f"dynamic_{self.time_step}"
self.state[key] = random.random()
class CommunicationSystem:
"""
通信系统
支持:
1. 消息传递
2. 协议转换
3. 消息队列
4. 广播与组播
"""
def __init__(self):
self.message_queues: Dict[str, List[Message]] = defaultdict(list)
self.message_history: List[Message] = []
self.protocols: Set[CommunicationProtocol] = set()
def send_message(self, message: Message):
"""发送消息"""
self.message_queues[message.receiver_id].append(message)
self.message_history.append(message)
self.protocols.add(message.protocol)
def receive_messages(self, agent_id: str) -> List[Message]:
"""接收消息"""
messages = self.message_queues[agent_id]
self.message_queues[agent_id] = []
return messages
def broadcast(self, sender_id: str, content: Any,
protocol: CommunicationProtocol = CommunicationProtocol.FIPA_ACL):
"""广播消息"""
# 简化:广播给所有智能体(实际应排除发送者)
for agent_id in self.message_queues.keys():
if agent_id != sender_id:
msg = Message(
id=hashlib.md5(f"{sender_id}{datetime.now().isoformat()}".encode()).hexdigest()[:16],
sender_id=sender_id,
receiver_id=agent_id,
content=content,
protocol=protocol
)
self.send_message(msg)
def get_communication_stats(self) -> Dict[str, Any]:
"""获取通信统计"""
return {
"total_messages": len(self.message_history),
"active_protocols": [p.value for p in self.protocols],
"pending_messages": sum(len(q) for q in self.message_queues.values())
}
class TaskAllocator:
"""
任务分配器
支持:
1. 任务分解
2. 能力匹配
3. 优化分配
4. 动态重分配
"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.allocations: Dict[str, str] = {} # task_id -> agent_id
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
def allocate_tasks(self, agents: Dict[str, AgentState]) -> Dict[str, str]:
"""
分配任务给智能体
策略:基于能力匹配和效用最大化
"""
allocations = {}
unallocated = []
for task_id, task in self.tasks.items():
if task.status != "pending":
continue
best_agent = None
best_score = -float('inf')
for agent_id, agent in agents.items():
# 计算能力匹配度
capability_score = 0.0
for cap, value in agent.capabilities.items():
if cap in task.description.lower():
capability_score += value
# 计算效用
utility = task.reward * capability_score - task.difficulty
if utility > best_score:
best_score = utility
best_agent = agent_id
if best_agent:
allocations[task_id] = best_agent
self.allocations[task_id] = best_agent
task.assigned_to = best_agent
task.status = "assigned"
else:
unallocated.append(task_id)
return allocations
def get_allocation_stats(self) -> Dict[str, Any]:
"""获取分配统计"""
total = len(self.tasks)
allocated = sum(1 for t in self.tasks.values() if t.status == "assigned")
completed = sum(1 for t in self.tasks.values() if t.status == "completed")
return {
"total_tasks": total,
"allocated": allocated,
"unallocated": total - allocated,
"completed": completed,
"allocation_rate": allocated / total if total > 0 else 0
}
class MultiAgentSystem:
"""
多智能体系统
整合:
1. 环境
2. 通信系统
3. 任务分配
4. 智能体管理
"""
def __init__(self, grid_size: int = 10):
self.environment = Environment(grid_size)
self.communication = CommunicationSystem()
self.task_allocator = TaskAllocator()
self.agents: Dict[str, AgentState] = {}
# 统计
self.stats = {
"total_steps": 0,
"total_messages": 0,
"total_tasks": 0,
"collaborations": 0
}
def add_agent(self, agent: AgentState):
"""添加智能体"""
self.agents[agent.id] = agent
self.environment.register_agent(agent)
def add_task(self, task: Task):
"""添加任务"""
self.task_allocator.add_task(task)
self.stats["total_tasks"] += 1
def run_step(self):
"""运行一个时间步"""
self.stats["total_steps"] += 1
# 1. 任务分配
allocations = self.task_allocator.allocate_tasks(self.agents)
# 2. 智能体感知
perceptions = {}
for agent_id in self.agents:
perceptions[agent_id] = self.environment.perceive(agent_id)
# 3. 智能体决策与行动(简化)
for agent_id, agent in self.agents.items():
# 接收消息
messages = self.communication.receive_messages(agent_id)
agent.messages.extend(messages)
# 基于感知和消息决策(简化:随机行动)
if random.random() < 0.5:
action = {"type": "modify", "key": f"action_{agent_id}", "value": random.random()}
self.environment.execute_action(agent_id, action)
# 发送消息(简化)
if random.random() < 0.3 and len(self.agents) > 1:
other_agents = [aid for aid in self.agents if aid != agent_id]
if other_agents:
receiver = random.choice(other_agents)
msg = Message(
id=hashlib.md5(f"{agent_id}{datetime.now().isoformat()}".encode()).hexdigest()[:16],
sender_id=agent_id,
receiver_id=receiver,
content={"type": "coordination", "data": "let's collaborate"},
protocol=CommunicationProtocol.FIPA_ACL
)
self.communication.send_message(msg)
self.stats["total_messages"] += 1
# 4. 环境推进
self.environment.step()
# 5. 更新统计
if len(allocations) > 0:
self.stats["collaborations"] += 1
def run_simulation(self, steps: int = 100):
"""运行仿真"""
for _ in range(steps):
self.run_step()
def get_system_stats(self) -> Dict[str, Any]:
"""获取系统统计"""
return {
**self.stats,
"num_agents": len(self.agents),
"communication": self.communication.get_communication_stats(),
"task_allocation": self.task_allocator.get_allocation_stats()
}
# 使用示例
if __name__ == "__main__":
print("=== 多智能体系统 MAS 理论基础 ===\n")
# 创建 MAS
mas = MultiAgentSystem(grid_size=10)
print("=== 添加智能体 ===")
# 添加智能体
agents_data = [
{"id": "agent1", "type": AgentType.COGNITIVE, "capabilities": {"planning": 0.9, "communication": 0.8}},
{"id": "agent2", "type": AgentType.REACTIVE, "capabilities": {"perception": 0.95, "action": 0.9}},
{"id": "agent3", "type": AgentType.SOCIAL, "capabilities": {"negotiation": 0.85, "cooperation": 0.9}},
]
for data in agents_data:
agent = AgentState(
id=data["id"],
agent_type=data["type"],
capabilities=data["capabilities"],
beliefs={},
goals=["maximize_utility"],
plans=[]
)
mas.add_agent(agent)
print(f"添加智能体:{data['id']} ({data['type'].value})")
print(f"\n=== 添加任务 ===")
# 添加任务
tasks_data = [
{"id": "task1", "desc": "planning intensive task", "difficulty": 0.7, "reward": 10.0},
{"id": "task2", "desc": "perception task", "difficulty": 0.5, "reward": 8.0},
{"id": "task3", "desc": "negotiation task", "difficulty": 0.8, "reward": 12.0},
]
for i, data in enumerate(tasks_data):
task = Task(
id=data["id"],
description=data["desc"],
difficulty=data["difficulty"],
reward=data["reward"],
deadline=datetime.now()
)
mas.add_task(task)
print(f"添加任务:{data['id']} - {data['desc'][:20]}...")
print(f"\n=== 运行仿真 ===")
# 运行仿真
mas.run_simulation(steps=50)
print(f"仿真完成:50 个时间步")
print(f"\n=== 系统统计 ===")
stats = mas.get_system_stats()
print(f"智能体数量:{stats['num_agents']}")
print(f"总时间步:{stats['total_steps']}")
print(f"总消息数:{stats['total_messages']}")
print(f"总任务数:{stats['total_tasks']}")
print(f"协作次数:{stats['collaborations']}")
print(f"\n通信统计:")
comm_stats = stats['communication']
print(f" 总消息:{comm_stats['total_messages']}")
print(f" 活跃协议:{comm_stats['active_protocols']}")
print(f"\n任务分配统计:")
task_stats = stats['task_allocation']
print(f" 总任务:{task_stats['total_tasks']}")
print(f" 已分配:{task_stats['allocated']}")
print(f" 分配率:{task_stats['allocation_rate']:.2%}")
print(f"\n关键观察:")
print("1. 多智能体:多个自主智能体协同工作")
print("2. 通信系统:标准化协议实现信息交换")
print("3. 任务分配:基于能力匹配的优化分配")
print("4. 环境交互:感知 - 决策 - 行动循环")
print("5. 群体智能:通过协作实现超越个体的能力")
print("\nMAS 的核心:自主智能体 + 通信 + 协作 = 群体智能")