价值对齐与共识冲突消解完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import random
from collections import defaultdict
import copy
class AlignmentType(Enum):
"""对齐类型"""
VALUE_ALIGNMENT = "value_alignment" # 价值对齐
PREFERENCE_ALIGNMENT = "preference" # 偏好对齐
ETHICAL_ALIGNMENT = "ethical" # 伦理对齐
NORM_ALIGNMENT = "norm" # 规范对齐
class ConsensusType(Enum):
"""共识类型"""
UNANIMOUS = "unanimous" # 一致同意
MAJORITY = "majority" # 多数决
WEIGHTED = "weighted" # 加权投票
CONSENSUS = "consensus" # 协商共识
class ConflictType(Enum):
"""冲突类型"""
RESOURCE_CONFLICT = "resource" # 资源冲突
GOAL_CONFLICT = "goal" # 目标冲突
VALUE_CONFLICT = "value" # 价值冲突
SCHEDULE_CONFLICT = "schedule" # 调度冲突
@dataclass
class Agent:
"""智能体"""
id: str
value_function: Dict[str, float] = field(default_factory=dict)
preferences: Dict[str, float] = field(default_factory=dict)
ethical_constraints: List[str] = field(default_factory=list)
reputation: float = 1.0
resources: Dict[str, float] = field(default_factory=dict)
@dataclass
class Proposal:
"""提案"""
id: str
proposer: str
description: str
value_distribution: Dict[str, float]
supporters: Set[str] = field(default_factory=set)
opponents: Set[str] = field(default_factory=set)
class ValueAligner:
"""
价值对齐器
支持:
1. 偏好学习
2. 逆向强化学习
3. 伦理约束检查
4. 规范建模
"""
def __init__(self):
self.human_preferences: Dict[str, List[Tuple[str, str]]] = {}
self.ethical_rules: List[str] = []
self.social_norms: Dict[str, float] = {}
def learn_preferences(self, agent_id: str, comparisons: List[Tuple[str, str]]):
"""学习偏好 (从成对比较)"""
if agent_id not in self.human_preferences:
self.human_preferences[agent_id] = []
self.human_preferences[agent_id].extend(comparisons)
def infer_value_function(self, agent_id: str,
options: List[str]) -> Dict[str, float]:
"""从偏好推断价值函数"""
if agent_id not in self.human_preferences:
# 默认均匀分布
return {opt: 1.0 / len(options) for opt in options}
# 计算每个选项的得分
scores = defaultdict(float)
comparisons = self.human_preferences[agent_id]
for preferred, less_preferred in comparisons:
scores[preferred] += 1.0
scores[less_preferred] -= 1.0
# 归一化
total = sum(abs(score) for score in scores.values()) + 1e-6
value_function = {}
for opt in options:
score = scores.get(opt, 0.0)
value_function[opt] = (score + 1) / (total + len(options))
return value_function
def check_ethical_constraint(self, action: str,
constraints: List[str]) -> bool:
"""检查伦理约束"""
for constraint in constraints:
if constraint.startswith("forbid_"):
forbidden_action = constraint[7:]
if action == forbidden_action:
return False
elif constraint.startswith("require_"):
required_action = constraint[8:]
if action != required_action:
return False
return True
def model_social_norm(self, norm_name: str,
compliance_rate: float):
"""建模社会规范"""
self.social_norms[norm_name] = compliance_rate
def align_agent(self, agent: Agent, target_values: Dict[str, float]) -> Agent:
"""对齐智能体价值"""
# 计算价值差异
value_diff = {}
for key in target_values:
current = agent.value_function.get(key, 0.0)
target = target_values[key]
value_diff[key] = target - current
# 更新价值函数 (逐步对齐)
learning_rate = 0.1
for key, diff in value_diff.items():
agent.value_function[key] = agent.value_function.get(key, 0.0) + learning_rate * diff
# 归一化
total = sum(agent.value_function.values()) + 1e-6
for key in agent.value_function:
agent.value_function[key] /= total
return agent
class ConsensusBuilder:
"""
共识构建器
支持:
1. 投票机制
2. 协商谈判
3. 一致性协议
4. 决策融合
"""
def __init__(self, consensus_type: ConsensusType = ConsensusType.MAJORITY):
self.consensus_type = consensus_type
self.proposals: Dict[str, Proposal] = {}
self.voting_history: List[Dict[str, Any]] = []
def create_proposal(self, proposer: str, description: str,
value_distribution: Dict[str, float]) -> Proposal:
"""创建提案"""
proposal_id = f"proposal_{len(self.proposals)}"
proposal = Proposal(
id=proposal_id,
proposer=proposer,
description=description,
value_distribution=value_distribution
)
self.proposals[proposal_id] = proposal
return proposal
def vote(self, agent_id: str, proposal_id: str,
vote: str, weight: float = 1.0):
"""投票"""
if proposal_id not in self.proposals:
return
proposal = self.proposals[proposal_id]
if vote == "support":
proposal.supporters.add(agent_id)
proposal.opponents.discard(agent_id)
elif vote == "oppose":
proposal.opponents.add(agent_id)
proposal.supporters.discard(agent_id)
def calculate_consensus(self, proposal_id: str,
agent_weights: Dict[str, float] = None) -> Tuple[bool, float]:
"""计算共识"""
if proposal_id not in self.proposals:
return False, 0.0
proposal = self.proposals[proposal_id]
if self.consensus_type == ConsensusType.UNANIMOUS:
# 一致同意
total_agents = len(proposal.supporters) + len(proposal.opponents)
if total_agents == 0:
return False, 0.0
consensus_rate = len(proposal.supporters) / total_agents
return consensus_rate == 1.0, consensus_rate
elif self.consensus_type == ConsensusType.MAJORITY:
# 多数决
support_weight = len(proposal.supporters)
oppose_weight = len(proposal.opponents)
total = support_weight + oppose_weight
if total == 0:
return False, 0.0
consensus_rate = support_weight / total
return consensus_rate > 0.5, consensus_rate
elif self.consensus_type == ConsensusType.WEIGHTED:
# 加权投票
if agent_weights is None:
agent_weights = {}
support_weight = sum(agent_weights.get(agent, 1.0)
for agent in proposal.supporters)
oppose_weight = sum(agent_weights.get(agent, 1.0)
for agent in proposal.opponents)
total = support_weight + oppose_weight
if total == 0:
return False, 0.0
consensus_rate = support_weight / total
return consensus_rate > 0.5, consensus_rate
return False, 0.0
def negotiate(self, proposal_id: str, agents: List[Agent],
max_rounds: int = 10) -> Tuple[bool, Proposal]:
"""协商谈判"""
if proposal_id not in self.proposals:
return False, None
proposal = self.proposals[proposal_id]
for round_num in range(max_rounds):
# 检查共识
reached, rate = self.calculate_consensus(proposal_id)
if reached:
return True, proposal
# 调整提案 (简化:向反对者妥协)
if proposal.opponents:
# 平均分配价值给反对者
num_opponents = len(proposal.opponents)
compromise_amount = 0.1 / num_opponents
for opponent in proposal.opponents:
if opponent in proposal.value_distribution:
proposal.value_distribution[opponent] += compromise_amount
# 重新归一化
total = sum(proposal.value_distribution.values())
for key in proposal.value_distribution:
proposal.value_distribution[key] /= total
# 重新投票 (简化:部分反对者转为支持)
if random.random() < 0.3:
converting = random.choice(list(proposal.opponents))
proposal.supporters.add(converting)
proposal.opponents.remove(converting)
# 检查最终共识
reached, rate = self.calculate_consensus(proposal_id)
return reached, proposal
class ConflictResolver:
"""
冲突消解器
支持:
1. 冲突检测
2. 资源分配
3. 优先级仲裁
4. 调度优化
"""
def __init__(self):
self.conflicts: List[Dict[str, Any]] = []
self.resolution_history: List[Dict[str, Any]] = []
def detect_conflict(self, agents: List[Agent],
resources: Dict[str, float]) -> List[Dict[str, Any]]:
"""检测冲突"""
conflicts = []
# 检测资源冲突
resource_demand = defaultdict(float)
for agent in agents:
for resource, amount in agent.resources.items():
resource_demand[resource] += amount
for resource, demand in resource_demand.items():
supply = resources.get(resource, 0.0)
if demand > supply:
conflicts.append({
"type": ConflictType.RESOURCE_CONFLICT.value,
"resource": resource,
"demand": demand,
"supply": supply,
"shortage": demand - supply,
"involved_agents": [a.id for a in agents if resource in a.resources]
})
self.conflicts.extend(conflicts)
return conflicts
def resolve_resource_conflict(self, conflict: Dict[str, Any],
agents: List[Agent]) -> Dict[str, float]:
"""消解资源冲突"""
involved_ids = conflict["involved_agents"]
involved_agents = [a for a in agents if a.id in involved_ids]
if not involved_agents:
return {}
# 按声誉比例分配
total_reputation = sum(a.reputation for a in involved_agents)
resource = conflict["resource"]
supply = conflict["supply"]
allocation = {}
for agent in involved_agents:
ratio = agent.reputation / (total_reputation + 1e-6)
allocation[agent.id] = supply * ratio
return allocation
def prioritize_goals(self, agents: List[Agent],
goals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""优先级排序目标"""
# 计算每个目标的优先级分数
for goal in goals:
score = 0.0
# 考虑紧急性
urgency = goal.get("urgency", 0.5)
score += urgency * 0.4
# 考虑重要性
importance = goal.get("importance", 0.5)
score += importance * 0.4
# 考虑涉及智能体数量
num_agents = len(goal.get("involved_agents", []))
score += (num_agents / (len(agents) + 1e-6)) * 0.2
goal["priority_score"] = score
# 按优先级排序
goals.sort(key=lambda g: g.get("priority_score", 0.0), reverse=True)
return goals
def resolve_conflict(self, conflict: Dict[str, Any],
agents: List[Agent],
resources: Dict[str, float]) -> Dict[str, Any]:
"""消解冲突"""
conflict_type = conflict["type"]
if conflict_type == ConflictType.RESOURCE_CONFLICT.value:
allocation = self.resolve_resource_conflict(conflict, agents)
resolution = {
"timestamp": datetime.now().isoformat(),
"conflict_type": conflict_type,
"method": "reputation_based_allocation",
"allocation": allocation,
"resolved": True
}
else:
resolution = {
"timestamp": datetime.now().isoformat(),
"conflict_type": conflict_type,
"method": "default",
"resolved": False
}
self.resolution_history.append(resolution)
return resolution
class MultiAgentCoordinator:
"""
多智能体协调器
支持:
1. 价值对齐
2. 共识构建
3. 冲突消解
4. 任务协调
"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.value_aligner = ValueAligner()
self.consensus_builder = ConsensusBuilder()
self.conflict_resolver = ConflictResolver()
self.coordination_history: List[Dict[str, Any]] = []
def register_agent(self, agent: Agent):
"""注册智能体"""
self.agents[agent.id] = agent
def align_all_agents(self, target_values: Dict[str, float]):
"""对齐所有智能体"""
for agent in self.agents.values():
self.value_aligner.align_agent(agent, target_values)
def propose_and_vote(self, proposer_id: str, description: str,
value_distribution: Dict[str, float]) -> Tuple[bool, float]:
"""提案与投票"""
# 创建提案
proposal = self.consensus_builder.create_proposal(
proposer_id, description, value_distribution
)
# 所有智能体投票
for agent_id, agent in self.agents.items():
if agent_id == proposer_id:
self.consensus_builder.vote(agent_id, proposal.id, "support")
else:
# 简化:根据价值分配决定是否支持
agent_share = value_distribution.get(agent_id, 0.0)
if agent_share > 0.1: # 阈值
self.consensus_builder.vote(agent_id, proposal.id, "support")
else:
self.consensus_builder.vote(agent_id, proposal.id, "oppose")
# 计算共识
reached, rate = self.consensus_builder.calculate_consensus(proposal.id)
return reached, rate
def check_and_resolve_conflicts(self, resources: Dict[str, float]):
"""检查并消解冲突"""
agents = list(self.agents.values())
# 检测冲突
conflicts = self.conflict_resolver.detect_conflict(agents, resources)
resolutions = []
for conflict in conflicts:
resolution = self.conflict_resolver.resolve_conflict(
conflict, agents, resources
)
resolutions.append(resolution)
return conflicts, resolutions
def coordinate_task(self, task_id: str, task_value: float,
required_agents: List[str]) -> Dict[str, Any]:
"""协调任务"""
if not required_agents:
return {"success": False, "reason": "No agents required"}
# 检查智能体可用性
available_agents = []
for agent_id in required_agents:
if agent_id in self.agents:
available_agents.append(self.agents[agent_id])
if len(available_agents) < len(required_agents):
return {"success": False, "reason": "Insufficient agents"}
# 价值分配 (按声誉)
total_reputation = sum(a.reputation for a in available_agents)
value_distribution = {}
for agent in available_agents:
ratio = agent.reputation / (total_reputation + 1e-6)
value_distribution[agent.id] = task_value * ratio
# 创建提案并投票
description = f"Task {task_id} coordination"
reached, rate = self.propose_and_vote(
required_agents[0], description, value_distribution
)
result = {
"task_id": task_id,
"success": reached,
"consensus_rate": rate,
"value_distribution": value_distribution,
"participating_agents": required_agents
}
self.coordination_history.append(result)
return result
# 使用示例
if __name__ == "__main__":
print("=== 多智能体对齐、共识与冲突消解 ===\n")
print("=== 创建协调系统 ===")
# 创建协调器
coordinator = MultiAgentCoordinator()
# 注册智能体
num_agents = 5
for i in range(num_agents):
agent = Agent(
id=f"agent_{i}",
value_function={"cooperation": 0.5, "competition": 0.5},
preferences={},
ethical_constraints=["forbid_harm"],
reputation=random.uniform(0.7, 1.0),
resources={"energy": random.uniform(10, 50)}
)
coordinator.register_agent(agent)
print(f"注册{num_agents}个智能体")
print(f"\n=== 价值对齐 ===")
# 学习偏好
coordinator.value_aligner.learn_preferences("human_1", [
("cooperation", "competition"),
("cooperation", "selfishness"),
("fairness", "inequality")
])
# 推断价值函数
target_values = coordinator.value_aligner.infer_value_function(
"human_1", ["cooperation", "competition", "fairness"]
)
print(f"目标价值函数:{target_values}")
# 对齐所有智能体
coordinator.align_all_agents(target_values)
print(f"智能体价值对齐完成")
for agent_id, agent in coordinator.agents.items():
print(f" {agent_id}: cooperation={agent.value_function.get('cooperation', 0):.3f}")
print(f"\n=== 共识构建 ===")
# 创建提案
value_dist = {f"agent_{i}": 0.2 for i in range(num_agents)}
reached, rate = coordinator.propose_and_vote(
"agent_0",
"Resource sharing proposal",
value_dist
)
print(f"提案结果:")
print(f" 共识达成:{reached}")
print(f" 共识率:{rate:.2%}")
print(f"\n=== 冲突检测与消解 ===")
# 设置资源限制
resources = {"energy": 100.0}
# 增加智能体资源需求
for agent in coordinator.agents.values():
agent.resources["energy"] = 30.0 # 总需求 150 > 供应 100
# 检测并消解冲突
conflicts, resolutions = coordinator.check_and_resolve_conflicts(resources)
print(f"检测到{len(conflicts)}个冲突:")
for conflict in conflicts:
print(f" 类型:{conflict['type']}")
print(f" 资源:{conflict['resource']}")
print(f" 需求:{conflict['demand']:.1f}, 供应:{conflict['supply']:.1f}")
print(f" 短缺:{conflict['shortage']:.1f}")
print(f"\n消解方案:")
for resolution in resolutions:
if resolution["resolved"]:
print(f" 方法:{resolution['method']}")
print(f" 分配:")
for agent_id, amount in resolution["allocation"].items():
print(f" {agent_id}: {amount:.1f}")
print(f"\n=== 任务协调 ===")
# 协调任务
task_result = coordinator.coordinate_task(
task_id="task_1",
task_value=100.0,
required_agents=["agent_0", "agent_1", "agent_2"]
)
print(f"任务协调结果:")
print(f" 成功:{task_result['success']}")
print(f" 共识率:{task_result['consensus_rate']:.2%}")
print(f" 价值分配:")
for agent_id, value in task_result["value_distribution"].items():
print(f" {agent_id}: {value:.1f}")
print(f"\n关键观察:")
print("1. 价值对齐:从偏好学习实现目标一致")
print("2. 共识达成:从投票协商实现决策统一")
print("3. 冲突消解:从优先级仲裁实现和谐共存")
print("4. 协调协作:从联盟形成实现群体涌现")
print("5. 集体智慧:对齐 + 共识 + 消解 + 协作 = 集体智慧")
print("\n协作的奇迹:个体理性 → 集体智慧")