多智能体通信协议完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import hashlib
import json
from collections import defaultdict, deque
import asyncio
import uuid
class Performative(Enum):
"""言语行为类型 (FIPA-ACL)"""
INFORM = "inform" # 告知
REQUEST = "request" # 请求
AGREE = "agree" # 同意
REFUSE = "refuse" # 拒绝
PROPOSE = "propose" # 提议
ACCEPT_PROPOSAL = "accept-proposal"
REJECT_PROPOSAL = "reject-proposal"
CFP = "cfp" # 招标
QUERY_IF = "query-if" # 查询
QUERY_REF = "query-ref"
NOT_UNDERSTOOD = "not-understood"
FAILURE = "failure"
class Protocol(Enum):
"""通信协议"""
FIPA_ACL = "fipa_acl"
KQML = "kqml"
MCP = "mcp"
ACP = "acp"
CUSTOM = "custom"
class MessageStatus(Enum):
"""消息状态"""
PENDING = "pending"
SENT = "sent"
DELIVERED = "delivered"
READ = "read"
FAILED = "failed"
@dataclass
class ACLMessage:
"""FIPA-ACL 消息"""
id: str
performative: Performative
sender: str
receivers: List[str]
content: Any
language: str = "SL"
ontology: str = "default"
protocol: str = "FIPA-Contract-Net"
reply_with: str = None
in_reply_to: str = None
reply_by: datetime = None
timestamp: datetime = field(default_factory=datetime.now)
status: MessageStatus = MessageStatus.PENDING
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"performative": self.performative.value,
"sender": self.sender,
"receivers": self.receivers,
"content": self.content,
"language": self.language,
"ontology": self.ontology,
"protocol": self.protocol,
"reply_with": self.reply_with,
"in_reply_to": self.in_reply_to,
"reply_by": self.reply_by.isoformat() if self.reply_by else None,
"timestamp": self.timestamp.isoformat(),
"status": self.status.value,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ACLMessage':
"""从字典创建"""
return cls(
id=data["id"],
performative=Performative(data["performative"]),
sender=data["sender"],
receivers=data["receivers"],
content=data["content"],
language=data.get("language", "SL"),
ontology=data.get("ontology", "default"),
protocol=data.get("protocol", "FIPA-Contract-Net"),
reply_with=data.get("reply_with"),
in_reply_to=data.get("in_reply_to"),
reply_by=datetime.fromisoformat(data["reply_by"]) if data.get("reply_by") else None,
timestamp=datetime.fromisoformat(data["timestamp"]),
status=MessageStatus(data.get("status", "pending")),
metadata=data.get("metadata", {})
)
class MessageQueue:
"""
消息队列
支持:
1. 消息入队/出队
2. 优先级队列
3. 消息持久化
4. 队列管理
"""
def __init__(self, max_size: int = 10000):
self.max_size = max_size
self.queue: deque = deque()
self.priority_queue: List[Tuple[int, ACLMessage]] = []
self.message_store: Dict[str, ACLMessage] = {}
def enqueue(self, message: ACLMessage, priority: int = 0):
"""消息入队"""
if len(self.queue) >= self.max_size:
raise Exception("队列已满")
self.queue.append(message)
self.message_store[message.id] = message
if priority > 0:
# 添加到优先级队列
heapq.heappush(self.priority_queue, (-priority, message))
def dequeue(self) -> Optional[ACLMessage]:
"""消息出队"""
if not self.queue:
return None
message = self.queue.popleft()
message.status = MessageStatus.DELIVERED
return message
def dequeue_priority(self) -> Optional[ACLMessage]:
"""优先级出队"""
if not self.priority_queue:
return self.dequeue()
_, message = heapq.heappop(self.priority_queue)
message.status = MessageStatus.DELIVERED
return message
def peek(self) -> Optional[ACLMessage]:
"""查看队首消息"""
return self.queue[0] if self.queue else None
def size(self) -> int:
"""队列大小"""
return len(self.queue)
def is_empty(self) -> bool:
"""队列是否为空"""
return len(self.queue) == 0
class CommunicationChannel:
"""
通信信道
支持:
1. 点对点通信
2. 广播
3. 组播
4. 消息路由
"""
def __init__(self):
self.agent_queues: Dict[str, MessageQueue] = defaultdict(MessageQueue)
self.subscriptions: Dict[str, Set[str]] = defaultdict(set) # topic -> agents
self.message_log: List[ACLMessage] = []
self.routes: Dict[str, str] = {} # agent_id -> channel
def send(self, message: ACLMessage, priority: int = 0):
"""发送消息"""
for receiver in message.receivers:
if receiver in self.agent_queues:
self.agent_queues[receiver].enqueue(message, priority)
message.status = MessageStatus.SENT
# 记录日志
self.message_log.append(message)
def broadcast(self, sender: str, content: Any,
performative: Performative = Performative.INFORM):
"""广播消息"""
message = ACLMessage(
id=str(uuid.uuid4())[:16],
performative=performative,
sender=sender,
receivers=list(self.agent_queues.keys()),
content=content
)
self.send(message)
def publish(self, topic: str, sender: str, content: Any):
"""发布到主题"""
subscribers = self.subscriptions.get(topic, set())
if not subscribers:
return
message = ACLMessage(
id=str(uuid.uuid4())[:16],
performative=Performative.INFORM,
sender=sender,
receivers=list(subscribers),
content=content,
metadata={"topic": topic}
)
self.send(message)
def subscribe(self, agent_id: str, topic: str):
"""订阅主题"""
self.subscriptions[topic].add(agent_id)
# 确保 agent 有队列
if agent_id not in self.agent_queues:
self.agent_queues[agent_id] = MessageQueue()
def receive(self, agent_id: str) -> Optional[ACLMessage]:
"""接收消息"""
if agent_id not in self.agent_queues:
return None
message = self.agent_queues[agent_id].dequeue()
if message:
message.status = MessageStatus.READ
return message
def get_stats(self) -> Dict[str, Any]:
"""获取统计"""
return {
"total_agents": len(self.agent_queues),
"total_messages": len(self.message_log),
"topics": len(self.subscriptions),
"queue_sizes": {aid: q.size() for aid, q in self.agent_queues.items()}
}
class OntologyService:
"""
本体论服务
支持:
1. 本体注册
2. 语义映射
3. 术语解释
4. 本体对齐
"""
def __init__(self):
self.ontologies: Dict[str, Dict[str, Any]] = {}
self.mappings: Dict[str, Dict[str, str]] = {} # (onto1, onto2) -> term_mapping
def register_ontology(self, name: str, ontology: Dict[str, Any]):
"""注册本体"""
self.ontologies[name] = ontology
def add_mapping(self, from_ontology: str, to_ontology: str,
from_term: str, to_term: str):
"""添加语义映射"""
key = f"{from_ontology}:{to_ontology}"
if key not in self.mappings:
self.mappings[key] = {}
self.mappings[key][from_term] = to_term
def translate(self, content: Any, from_ontology: str, to_ontology: str) -> Any:
"""翻译内容"""
if from_ontology == to_ontology:
return content
key = f"{from_ontology}:{to_ontology}"
mapping = self.mappings.get(key, {})
if isinstance(content, str):
return mapping.get(content, content)
elif isinstance(content, dict):
return {mapping.get(k, k): v for k, v in content.items()}
return content
def get_concept(self, ontology_name: str, concept: str) -> Optional[Dict[str, Any]]:
"""获取概念定义"""
ontology = self.ontologies.get(ontology_name, {})
return ontology.get("concepts", {}).get(concept)
class CommunicationProtocolManager:
"""
通信协议管理器
整合:
1. 多协议支持
2. 协议转换
3. 消息验证
4. 通信统计
"""
def __init__(self):
self.channel = CommunicationChannel()
self.ontology_service = OntologyService()
self.supported_protocols: Set[Protocol] = set()
self.protocol_handlers: Dict[Protocol, Any] = {}
# 统计
self.stats = {
"total_messages": 0,
"messages_by_performative": defaultdict(int),
"messages_by_protocol": defaultdict(int),
"failed_messages": 0
}
def register_protocol(self, protocol: Protocol, handler: Any):
"""注册协议处理器"""
self.supported_protocols.add(protocol)
self.protocol_handlers[protocol] = handler
def send_message(self, message: ACLMessage) -> bool:
"""发送消息"""
try:
# 验证消息
if not self._validate_message(message):
self.stats["failed_messages"] += 1
return False
# 语义转换
if message.ontology != "default":
# 可能需要转换
pass
# 发送
self.channel.send(message)
# 更新统计
self.stats["total_messages"] += 1
self.stats["messages_by_performative"][message.performative.value] += 1
self.stats["messages_by_protocol"][Protocol.FIPA_ACL.value] += 1
return True
except Exception as e:
self.stats["failed_messages"] += 1
return False
def _validate_message(self, message: ACLMessage) -> bool:
"""验证消息"""
# 检查必填字段
if not message.id or not message.sender or not message.receivers:
return False
# 检查言语行为
if message.performative not in Performative:
return False
# 检查内容
if message.content is None:
return False
return True
def create_inform(self, sender: str, receiver: str, content: Any,
ontology: str = "default") -> ACLMessage:
"""创建告知消息"""
return ACLMessage(
id=str(uuid.uuid4())[:16],
performative=Performative.INFORM,
sender=sender,
receivers=[receiver],
content=content,
ontology=ontology
)
def create_request(self, sender: str, receiver: str, content: Any,
reply_by: datetime = None) -> ACLMessage:
"""创建请求消息"""
return ACLMessage(
id=str(uuid.uuid4())[:16],
performative=Performative.REQUEST,
sender=sender,
receivers=[receiver],
content=content,
reply_by=reply_by
)
def create_cfp(self, sender: str, receivers: List[str],
task_description: Any) -> ACLMessage:
"""创建招标消息"""
return ACLMessage(
id=str(uuid.uuid4())[:16],
performative=Performative.CFP,
sender=sender,
receivers=receivers,
content=task_description,
protocol="FIPA-Contract-Net"
)
def get_communication_stats(self) -> Dict[str, Any]:
"""获取通信统计"""
return {
**self.stats,
"channel_stats": self.channel.get_stats(),
"supported_protocols": [p.value for p in self.supported_protocols]
}
# 使用示例
if __name__ == "__main__":
print("=== 多智能体通信协议与消息机制 ===\n")
# 创建协议管理器
manager = CommunicationProtocolManager()
print("=== 注册智能体 ===")
# 注册智能体(创建队列)
agents = ["agent1", "agent2", "agent3", "coordinator"]
for agent in agents:
manager.channel.agent_queues[agent] = MessageQueue()
print(f"注册智能体:{agent}")
print(f"\n=== 订阅主题 ===")
# 订阅主题
manager.channel.subscribe("agent2", "task_updates")
manager.channel.subscribe("agent3", "task_updates")
print("agent2, agent3 订阅 task_updates 主题")
print(f"\n=== 发送消息 ===")
# 创建并发送告知消息
msg1 = manager.create_inform(
sender="coordinator",
receiver="agent1",
content={"task": "inspection", "location": "area_A"},
ontology="manufacturing"
)
success = manager.send_message(msg1)
print(f"发送告知消息:{success}")
# 创建并发送请求消息
reply_by = datetime.now()
msg2 = manager.create_request(
sender="agent1",
receiver="coordinator",
content={"action": "approve", "task_id": "T001"},
reply_by=reply_by
)
success = manager.send_message(msg2)
print(f"发送请求消息:{success}")
# 创建并发送招标消息
msg3 = manager.create_cfp(
sender="coordinator",
receivers=["agent1", "agent2", "agent3"],
task_description={"task": "transport", "from": "A", "to": "B", "deadline": "2h"}
)
success = manager.send_message(msg3)
print(f"发送招标消息:{success}")
print(f"\n=== 发布订阅 ===")
# 发布到主题
manager.channel.publish(
topic="task_updates",
sender="coordinator",
content={"update": "task_completed", "task_id": "T002"}
)
print("发布 task_updates 主题消息")
print(f"\n=== 接收消息 ===")
# agent1 接收消息
received = manager.channel.receive("agent1")
if received:
print(f"agent1 收到消息:")
print(f" 言语行为:{received.performative.value}")
print(f" 发送者:{received.sender}")
print(f" 内容:{received.content}")
# agent2 接收主题消息
received = manager.channel.receive("agent2")
if received:
print(f"\nagent2 收到主题消息:")
print(f" 主题:{received.metadata.get('topic')}")
print(f" 内容:{received.content}")
print(f"\n=== 通信统计 ===")
stats = manager.get_communication_stats()
print(f"总消息数:{stats['total_messages']}")
print(f"失败消息:{stats['failed_messages']}")
print(f"言语行为分布:{dict(stats['messages_by_performative'])}")
print(f"智能体数量:{stats['channel_stats']['total_agents']}")
print(f"主题数量:{stats['channel_stats']['topics']}")
print(f"\n关键观察:")
print("1. FIPA-ACL:标准化的言语行为(inform, request, cfp 等)")
print("2. 消息队列:保证可靠传递、支持优先级")
print("3. 发布订阅:解耦发送者和接收者")
print("4. 本体论:消除语义歧义、实现互操作")
print("5. 协议管理:多协议支持、协议转换")
print("\n通信的核心:标准化协议 + 可靠传递 + 语义互操作 = 智能体协作")