Agent 标准化与协议完整实现
import time
import json
import math
import random
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field, asdict
from datetime import datetime, timedelta
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import threading
import uuid
from abc import ABC, abstractmethod
import hashlib
import base64
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding
import aiohttp
import asyncio
class AgentCapability(Enum):
"""Agent 能力类型"""
TEXT_GENERATION = "text_generation"
CODE_GENERATION = "code_generation"
DATA_ANALYSIS = "data_analysis"
IMAGE_PROCESSING = "image_processing"
VOICE_INTERACTION = "voice_interaction"
TOOL_EXECUTION = "tool_execution"
KNOWLEDGE_RETRIEVAL = "knowledge_retrieval"
TASK_PLANNING = "task_planning"
MULTI_MODAL = "multi_modal"
AUTONOMOUS_ACTION = "autonomous_action"
class AgentStatus(Enum):
"""Agent 状态"""
ONLINE = "online"
OFFLINE = "offline"
BUSY = "busy"
MAINTENANCE = "maintenance"
class MessageType(Enum):
"""消息类型"""
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
ERROR = "error"
class SecurityLevel(Enum):
"""安全等级"""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
@dataclass
class AgentCapabilityDescription:
"""Agent 能力描述"""
capability_type: AgentCapability
description: str
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
performance_metrics: Dict[str, float]
version: str
supported_languages: List[str]
@dataclass
class AgentMetadata:
"""Agent 元数据"""
agent_id: str
name: str
version: str
provider: str
description: str
capabilities: List[AgentCapabilityDescription]
status: AgentStatus
endpoints: Dict[str, str]
authentication_methods: List[str]
security_level: SecurityLevel
created_at: datetime
updated_at: datetime
tags: List[str]
@dataclass
class A2AMessage:
"""A2A 协议消息"""
message_id: str
conversation_id: str
sender_id: str
receiver_id: str
message_type: MessageType
timestamp: datetime
content: Dict[str, Any]
metadata: Dict[str, Any]
signature: Optional[str] = None
@dataclass
class MCPResource:
"""MCP 资源"""
resource_id: str
name: str
description: str
resource_type: str # file, database, api, tool
uri: str
schema: Dict[str, Any]
access_control: Dict[str, Any]
metadata: Dict[str, Any]
@dataclass
class MCPTool:
"""MCP 工具"""
tool_id: str
name: str
description: str
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
execution_endpoint: str
authentication_required: bool
rate_limit: Optional[Dict[str, int]]
@dataclass
class AgentTask:
"""Agent 任务"""
task_id: str
description: str
input_data: Dict[str, Any]
expected_output: Dict[str, Any]
assigned_agents: List[str]
status: str
created_at: datetime
completed_at: Optional[datetime]
result: Optional[Dict[str, Any]]
class AgentRegistry:
"""
Agent 注册中心
支持:
1. Agent 注册与发现
2. 能力查询
3. 健康检查
4. 负载均衡
"""
def __init__(self):
self.agents: Dict[str, AgentMetadata] = {}
self.capability_index: Dict[AgentCapability, List[str]] = defaultdict(list)
self.health_status: Dict[str, bool] = {}
self._lock = threading.Lock()
def register_agent(self, agent: AgentMetadata) -> bool:
"""注册 Agent"""
with self._lock:
if agent.agent_id in self.agents:
return False
self.agents[agent.agent_id] = agent
self.health_status[agent.agent_id] = True
# 建立能力索引
for cap in agent.capabilities:
self.capability_index[cap.capability_type].append(agent.agent_id)
return True
def discover_agents(self,
capability: AgentCapability,
status: Optional[AgentStatus] = None) -> List[AgentMetadata]:
"""发现 Agent"""
agent_ids = self.capability_index.get(capability, [])
agents = []
for agent_id in agent_ids:
agent = self.agents.get(agent_id)
if agent and (status is None or agent.status == status):
if self.health_status.get(agent_id, False):
agents.append(agent)
return agents
def get_agent(self, agent_id: str) -> Optional[AgentMetadata]:
"""获取 Agent 信息"""
return self.agents.get(agent_id)
def update_status(self, agent_id: str, status: AgentStatus) -> bool:
"""更新 Agent 状态"""
if agent_id not in self.agents:
return False
self.agents[agent_id].status = status
self.agents[agent_id].updated_at = datetime.now()
return True
def health_check(self, agent_id: str) -> bool:
"""健康检查"""
# 简化实现:模拟健康检查
is_healthy = random.random() > 0.05
self.health_status[agent_id] = is_healthy
return is_healthy
class A2AProtocol:
"""
A2A (Agent-to-Agent) 协议实现
支持:
1. 消息编码与解码
2. 签名验证
3. 可靠传输
4. 会话管理
"""
def __init__(self, private_key: Optional[bytes] = None):
self.message_queue: Dict[str, deque] = defaultdict(deque)
self.conversations: Dict[str, List[A2AMessage]] = defaultdict(list)
# 生成密钥对(如果未提供)
if private_key is None:
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
self.public_key = self.private_key.public_key()
else:
self.private_key = serialization.load_pem_private_key(
private_key,
password=None
)
self.public_key = self.private_key.public_key()
def create_message(self,
sender_id: str,
receiver_id: str,
message_type: MessageType,
content: Dict[str, Any],
conversation_id: Optional[str] = None) -> A2AMessage:
"""创建消息"""
message_id = f"msg_{uuid.uuid4().hex[:16]}"
if conversation_id is None:
conversation_id = f"conv_{uuid.uuid4().hex[:16]}"
message = A2AMessage(
message_id=message_id,
conversation_id=conversation_id,
sender_id=sender_id,
receiver_id=receiver_id,
message_type=message_type,
timestamp=datetime.now(),
content=content,
metadata={
'protocol_version': '1.0',
'encoding': 'utf-8'
}
)
# 签名消息
message.signature = self._sign_message(message)
return message
def _sign_message(self, message: A2AMessage) -> str:
"""签名消息"""
# 序列化消息内容(不包括签名)
message_data = {
'message_id': message.message_id,
'conversation_id': message.conversation_id,
'sender_id': message.sender_id,
'receiver_id': message.receiver_id,
'message_type': message.message_type.value,
'timestamp': message.timestamp.isoformat(),
'content': message.content,
'metadata': message.metadata
}
message_bytes = json.dumps(message_data, sort_keys=True).encode('utf-8')
# 使用私钥签名
signature = self.private_key.sign(
message_bytes,
padding.PKCS1v15(),
hashes.SHA256()
)
return base64.b64encode(signature).decode('utf-8')
def verify_message(self, message: A2AMessage, public_key_pem: bytes) -> bool:
"""验证消息签名"""
if not message.signature:
return False
# 反序列化公钥
public_key = serialization.load_pem_public_key(public_key_pem)
# 重构消息数据
message_data = {
'message_id': message.message_id,
'conversation_id': message.conversation_id,
'sender_id': message.sender_id,
'receiver_id': message.receiver_id,
'message_type': message.message_type.value,
'timestamp': message.timestamp.isoformat(),
'content': message.content,
'metadata': message.metadata
}
message_bytes = json.dumps(message_data, sort_keys=True).encode('utf-8')
signature = base64.b64decode(message.signature)
try:
public_key.verify(
signature,
message_bytes,
padding.PKCS1v15(),
hashes.SHA256()
)
return True
except Exception:
return False
def encode_message(self, message: A2AMessage) -> str:
"""编码消息为 JSON"""
return json.dumps({
'message_id': message.message_id,
'conversation_id': message.conversation_id,
'sender_id': message.sender_id,
'receiver_id': message.receiver_id,
'message_type': message.message_type.value,
'timestamp': message.timestamp.isoformat(),
'content': message.content,
'metadata': message.metadata,
'signature': message.signature
}, ensure_ascii=False)
def decode_message(self, json_str: str) -> A2AMessage:
"""从 JSON 解码消息"""
data = json.loads(json_str)
return A2AMessage(
message_id=data['message_id'],
conversation_id=data['conversation_id'],
sender_id=data['sender_id'],
receiver_id=data['receiver_id'],
message_type=MessageType(data['message_type']),
timestamp=datetime.fromisoformat(data['timestamp']),
content=data['content'],
metadata=data.get('metadata', {}),
signature=data.get('signature')
)
async def send_message(self,
message: A2AMessage,
endpoint: str,
timeout: int = 30) -> bool:
"""发送消息(异步)"""
encoded_message = self.encode_message(message)
try:
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
json={'message': encoded_message},
timeout=aiohttp.ClientTimeout(total=timeout)
) as response:
return response.status == 200
except Exception as e:
print(f"Failed to send message: {e}")
return False
def add_to_conversation(self, message: A2AMessage):
"""添加到会话历史"""
self.conversations[message.conversation_id].append(message)
def get_conversation(self, conversation_id: str) -> List[A2AMessage]:
"""获取会话历史"""
return self.conversations.get(conversation_id, [])
class MCPProtocol:
"""
MCP (Model Context Protocol) 协议实现
支持:
1. 资源管理
2. 工具注册
3. 上下文管理
4. 权限控制
"""
def __init__(self):
self.resources: Dict[str, MCPResource] = {}
self.tools: Dict[str, MCPTool] = {}
self.contexts: Dict[str, Dict[str, Any]] = {}
def register_resource(self, resource: MCPResource) -> bool:
"""注册资源"""
if resource.resource_id in self.resources:
return False
self.resources[resource.resource_id] = resource
return True
def register_tool(self, tool: MCPTool) -> bool:
"""注册工具"""
if tool.tool_id in self.tools:
return False
self.tools[tool.tool_id] = tool
return True
def list_resources(self,
resource_type: Optional[str] = None) -> List[MCPResource]:
"""列出资源"""
if resource_type is None:
return list(self.resources.values())
return [r for r in self.resources.values() if r.resource_type == resource_type]
def list_tools(self) -> List[MCPTool]:
"""列出工具"""
return list(self.tools.values())
def get_tool(self, tool_id: str) -> Optional[MCPTool]:
"""获取工具"""
return self.tools.get(tool_id)
def create_context(self,
context_id: str,
initial_data: Dict[str, Any]) -> bool:
"""创建上下文"""
if context_id in self.contexts:
return False
self.contexts[context_id] = {
'data': initial_data,
'created_at': datetime.now(),
'updated_at': datetime.now(),
'access_count': 0
}
return True
def update_context(self,
context_id: str,
updates: Dict[str, Any]) -> bool:
"""更新上下文"""
if context_id not in self.contexts:
return False
self.contexts[context_id]['data'].update(updates)
self.contexts[context_id]['updated_at'] = datetime.now()
self.contexts[context_id]['access_count'] += 1
return True
def get_context(self, context_id: str) -> Optional[Dict[str, Any]]:
"""获取上下文"""
context = self.contexts.get(context_id)
if context:
context['access_count'] += 1
context['updated_at'] = datetime.now()
return context
def delete_context(self, context_id: str) -> bool:
"""删除上下文"""
if context_id not in self.contexts:
return False
del self.contexts[context_id]
return True
class AgentOrchestrator:
"""
Agent 编排器
支持:
1. 任务分解
2. Agent 选择
3. 协作编排
4. 结果聚合
"""
def __init__(self, registry: AgentRegistry, a2a: A2AProtocol):
self.registry = registry
self.a2a = a2a
self.tasks: Dict[str, AgentTask] = {}
self.task_results: Dict[str, Dict[str, Any]] = {}
def create_task(self,
description: str,
input_data: Dict[str, Any],
expected_output: Dict[str, Any]) -> AgentTask:
"""创建任务"""
task_id = f"task_{uuid.uuid4().hex[:16]}"
task = AgentTask(
task_id=task_id,
description=description,
input_data=input_data,
expected_output=expected_output,
assigned_agents=[],
status='pending',
created_at=datetime.now(),
completed_at=None,
result=None
)
self.tasks[task_id] = task
return task
def assign_agents(self,
task_id: str,
required_capabilities: List[AgentCapability]) -> List[str]:
"""分配 Agent"""
task = self.tasks.get(task_id)
if not task:
return []
assigned_agents = []
for capability in required_capabilities:
# 发现具有该能力的 Agent
agents = self.registry.discover_agents(capability, AgentStatus.ONLINE)
if agents:
# 选择第一个可用的 Agent(简化:实际应使用负载均衡)
selected_agent = agents[0]
assigned_agents.append(selected_agent.agent_id)
task.assigned_agents = assigned_agents
task.status = 'assigned'
return assigned_agents
async def execute_task(self, task_id: str) -> Dict[str, Any]:
"""执行任务"""
task = self.tasks.get(task_id)
if not task:
return {'error': 'Task not found'}
task.status = 'executing'
results = []
# 向每个分配的 Agent 发送任务
for agent_id in task.assigned_agents:
agent = self.registry.get_agent(agent_id)
if not agent:
continue
# 创建任务消息
message = self.a2a.create_message(
sender_id='orchestrator',
receiver_id=agent_id,
message_type=MessageType.REQUEST,
content={
'task_id': task_id,
'description': task.description,
'input_data': task.input_data
}
)
# 发送消息(简化:实际应使用真实端点)
self.a2a.add_to_conversation(message)
# 模拟 Agent 响应
response_message = self.a2a.create_message(
sender_id=agent_id,
receiver_id='orchestrator',
message_type=MessageType.RESPONSE,
content={
'task_id': task_id,
'status': 'completed',
'result': {'data': f'Result from {agent_id}'}
},
conversation_id=message.conversation_id
)
self.a2a.add_to_conversation(response_message)
results.append(response_message.content)
# 聚合结果
task.result = {'sub_results': results}
task.status = 'completed'
task.completed_at = datetime.now()
return task.result
def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
"""获取任务状态"""
task = self.tasks.get(task_id)
if not task:
return None
return {
'task_id': task.task_id,
'description': task.description,
'status': task.status,
'assigned_agents': task.assigned_agents,
'created_at': task.created_at.isoformat(),
'completed_at': task.completed_at.isoformat() if task.completed_at else None,
'result': task.result
}
# 使用示例
if __name__ == "__main__":
print("=== Agent 标准化、协议与生态建设 ===\n")
print("=== 创建 Agent 注册中心 ===")
registry = AgentRegistry()
print(f"\n=== 注册 Agent ===")
# 创建 Agent 元数据
agent1 = AgentMetadata(
agent_id="agent_001",
name="DataAnalysisAgent",
version="1.0.0",
provider="TechCorp",
description="专业数据分析 Agent",
capabilities=[
AgentCapabilityDescription(
capability_type=AgentCapability.DATA_ANALYSIS,
description="高级数据分析与可视化",
input_schema={'type': 'object', 'properties': {'data': {'type': 'array'}}},
output_schema={'type': 'object', 'properties': {'insights': {'type': 'array'}}},
performance_metrics={'latency_ms': 150, 'accuracy': 0.95},
version="1.0.0",
supported_languages=['python', 'sql']
),
AgentCapabilityDescription(
capability_type=AgentCapability.TEXT_GENERATION,
description="分析报告生成",
input_schema={'type': 'object', 'properties': {'data': {'type': 'object'}}},
output_schema={'type': 'object', 'properties': {'report': {'type': 'string'}}},
performance_metrics={'latency_ms': 200, 'quality': 0.92},
version="1.0.0",
supported_languages=['en', 'zh']
)
],
status=AgentStatus.ONLINE,
endpoints={
'chat': 'https://api.techcorp.com/agent/001/chat',
'execute': 'https://api.techcorp.com/agent/001/execute'
},
authentication_methods=['api_key', 'oauth2'],
security_level=SecurityLevel.INTERNAL,
created_at=datetime.now(),
updated_at=datetime.now(),
tags=['analytics', 'reporting', 'business-intelligence']
)
agent2 = AgentMetadata(
agent_id="agent_002",
name="CodeGenerationAgent",
version="2.0.0",
provider="DevAI",
description="智能代码生成 Agent",
capabilities=[
AgentCapabilityDescription(
capability_type=AgentCapability.CODE_GENERATION,
description="多语言代码生成",
input_schema={'type': 'object', 'properties': {'requirement': {'type': 'string'}}},
output_schema={'type': 'object', 'properties': {'code': {'type': 'string'}}},
performance_metrics={'latency_ms': 300, 'accuracy': 0.88},
version="2.0.0",
supported_languages=['python', 'javascript', 'java', 'go']
)
],
status=AgentStatus.ONLINE,
endpoints={
'generate': 'https://api.devai.com/agent/002/generate'
},
authentication_methods=['api_key'],
security_level=SecurityLevel.INTERNAL,
created_at=datetime.now(),
updated_at=datetime.now(),
tags=['coding', 'development', 'automation']
)
registry.register_agent(agent1)
registry.register_agent(agent2)
print(f"已注册 Agent: {agent1.name} ({agent1.agent_id})")
print(f" 能力:{[c.capability_type.value for c in agent1.capabilities]}")
print(f" 状态:{agent1.status.value}")
print(f"\n已注册 Agent: {agent2.name} ({agent2.agent_id})")
print(f" 能力:{[c.capability_type.value for c in agent2.capabilities]}")
print(f" 状态:{agent2.status.value}")
print(f"\n=== 发现 Agent ===")
data_agents = registry.discover_agents(AgentCapability.DATA_ANALYSIS)
print(f"数据分析 Agent 数量:{len(data_agents)}")
for agent in data_agents:
print(f" - {agent.name} ({agent.provider})")
code_agents = registry.discover_agents(AgentCapability.CODE_GENERATION)
print(f"代码生成 Agent 数量:{len(code_agents)}")
for agent in code_agents:
print(f" - {agent.name} ({agent.provider})")
print(f"\n=== 创建 A2A 协议实例 ===")
a2a = A2AProtocol()
print(f"\n=== 创建 A2A 消息 ===")
message = a2a.create_message(
sender_id="agent_001",
receiver_id="agent_002",
message_type=MessageType.REQUEST,
content={
'action': 'generate_code',
'requirement': 'Create a function to analyze sales data',
'context': {'industry': 'retail', 'data_format': 'csv'}
}
)
print(f"消息 ID: {message.message_id}")
print(f"会话 ID: {message.conversation_id}")
print(f"发送者:{message.sender_id}")
print(f"接收者:{message.receiver_id}")
print(f"类型:{message.message_type.value}")
print(f"签名:{message.signature[:50]}...")
print(f"\n=== 编码消息 ===")
encoded = a2a.encode_message(message)
print(f"编码后长度:{len(encoded)} 字符")
print(f"编码消息预览:{encoded[:100]}...")
print(f"\n=== 解码消息 ===")
decoded = a2a.decode_message(encoded)
print(f"解码验证:{decoded.message_id == message.message_id}")
print(f"内容匹配:{decoded.content == message.content}")
print(f"\n=== 验证签名 ===")
# 获取公钥
public_key_pem = a2a.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
is_valid = a2a.verify_message(message, public_key_pem)
print(f"签名验证:{'通过 ✓' if is_valid else '失败 ✗'}")
print(f"\n=== 创建 MCP 协议实例 ===")
mcp = MCPProtocol()
print(f"\n=== 注册 MCP 工具 ===")
tool1 = MCPTool(
tool_id="tool_analytics",
name="DataAnalytics",
description="高级数据分析工具",
input_schema={'type': 'object', 'properties': {'dataset': {'type': 'string'}}},
output_schema={'type': 'object', 'properties': {'insights': {'type': 'array'}}},
execution_endpoint='https://api.techcorp.com/tools/analytics',
authentication_required=True,
rate_limit={'requests_per_minute': 60}
)
tool2 = MCPTool(
tool_id="tool_code_gen",
name="CodeGenerator",
description="智能代码生成工具",
input_schema={'type': 'object', 'properties': {'requirement': {'type': 'string'}}},
output_schema={'type': 'object', 'properties': {'code': {'type': 'string'}}},
execution_endpoint='https://api.devai.com/tools/code-gen',
authentication_required=True,
rate_limit={'requests_per_minute': 30}
)
mcp.register_tool(tool1)
mcp.register_tool(tool2)
print(f"已注册工具:{tool1.name} ({tool1.tool_id})")
print(f"已注册工具:{tool2.name} ({tool2.tool_id})")
print(f"\n=== 列出可用工具 ===")
tools = mcp.list_tools()
print(f"可用工具数量:{len(tools)}")
for tool in tools:
print(f" - {tool.name}: {tool.description}")
print(f"\n=== 创建上下文 ===")
mcp.create_context("ctx_001", {
'project': 'sales_analysis',
'user': 'analyst_001',
'data_source': 'sales_db'
})
print(f"上下文创建成功")
print(f"\n=== 创建 Agent 编排器 ===")
orchestrator = AgentOrchestrator(registry, a2a)
print(f"\n=== 创建任务 ===")
task = orchestrator.create_task(
description="分析销售数据并生成报告",
input_data={
'dataset': 'sales_2025_q1.csv',
'metrics': ['revenue', 'growth', 'conversion']
},
expected_output={
'report': 'string',
'visualizations': 'array'
}
)
print(f"任务 ID: {task.task_id}")
print(f"描述:{task.description}")
print(f"状态:{task.status}")
print(f"\n=== 分配 Agent ===")
assigned = orchestrator.assign_agents(
task.task_id,
[AgentCapability.DATA_ANALYSIS, AgentCapability.TEXT_GENERATION]
)
print(f"分配的 Agent: {assigned}")
print(f"\n=== 执行任务 ===")
import asyncio
result = asyncio.run(orchestrator.execute_task(task.task_id))
print(f"任务结果:{json.dumps(result, indent=2, ensure_ascii=False)}")
print(f"\n=== 获取任务状态 ===")
status = orchestrator.get_task_status(task.task_id)
print(f"任务状态:{json.dumps(status, indent=2, ensure_ascii=False)}")
print(f"\n关键观察:")
print("1. 标准化:统一架构描述 + 能力模型 + 安全规范")
print("2. A2A 协议:跨 Agent 通信 + 签名验证 + 会话管理")
print("3. MCP 协议:工具注册 + 资源管理 + 上下文管理")
print("4. 编排器:任务分解 + Agent 选择 + 协作编排")
print("5. 生态建设:注册中心 + 协议标准 + 编排机制 = 可信赖")
print("\nAgent 生态的使命:让协作更简单、更安全、更高效")