竞争合作与涌现行为完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import random
from collections import defaultdict
class GameType(Enum):
"""博弈类型"""
ZERO_SUM = "zero_sum" # 零和博弈
NON_ZERO_SUM = "non_zero_sum" # 非零和博弈
COOPERATIVE = "cooperative" # 合作博弈
NON_COOPERATIVE = "non_cooperative" # 非合作博弈
class StrategyType(Enum):
"""策略类型"""
DEFECT = "defect" # 背叛
COOPERATE = "cooperate" # 合作
TIT_FOR_TAT = "tit_for_tat" # 以牙还牙
GENEROUS = "generous" # 宽容策略
GRIM = "grim" # 冷酷策略
@dataclass
class Agent:
"""智能体"""
id: str
strategy: StrategyType
payoff: float = 0.0
fitness: float = 1.0
history: List[str] = field(default_factory=list)
cooperation_rate: float = 0.5
@dataclass
class Coalition:
"""联盟"""
id: str
members: Set[str]
value: float = 0.0
allocation: Dict[str, float] = field(default_factory=dict)
stability: float = 1.0
class CompetitionGame:
"""
竞争博弈
支持:
1. 零和博弈
2. 非零和博弈
3. 囚徒困境
4. 公共品博弈
"""
def __init__(self, game_type: GameType = GameType.NON_COOPERATIVE):
self.game_type = game_type
self.agents: Dict[str, Agent] = {}
self.payoff_matrix: Optional[np.ndarray] = None
self.history: List[Dict[str, Any]] = []
def add_agent(self, agent: Agent):
"""添加智能体"""
self.agents[agent.id] = agent
def prisoner_dilemma_payoff(self, action1: str, action2: str) -> Tuple[float, float]:
"""囚徒困境收益"""
# T=5 (诱惑), R=3 (奖励), P=1 (惩罚), S=0 (傻瓜)
if action1 == "cooperate" and action2 == "cooperate":
return (3, 3) # R, R
elif action1 == "cooperate" and action2 == "defect":
return (0, 5) # S, T
elif action1 == "defect" and action2 == "cooperate":
return (5, 0) # T, S
else:
return (1, 1) # P, P
def public_goods_payoff(self, contributions: List[float]) -> List[float]:
"""公共品博弈收益"""
total_contribution = sum(contributions)
# 公共品乘以增强因子后平均分配
enhancement_factor = 1.5
total_return = total_contribution * enhancement_factor
individual_return = total_return / len(contributions)
payoffs = []
for i, contribution in enumerate(contributions):
# 收益 = 初始端赋 - 贡献 + 公共品回报
payoff = 10 - contribution + individual_return
payoffs.append(payoff)
return payoffs
def select_action(self, agent: Agent, opponent: Optional[Agent] = None) -> str:
"""根据策略选择行动"""
if agent.strategy == StrategyType.DEFECT:
return "defect"
elif agent.strategy == StrategyType.COOPERATE:
return "cooperate"
elif agent.strategy == StrategyType.TIT_FOR_TAT:
# 以牙还牙:第一轮合作,之后模仿对手上一轮
if not opponent or not opponent.history:
return "cooperate"
return opponent.history[-1]
elif agent.strategy == StrategyType.GENEROUS:
# 宽容策略:以牙还牙但 10% 概率宽容
if opponent and opponent.history and opponent.history[-1] == "defect":
if random.random() < 0.1:
return "cooperate"
return "defect"
return "cooperate"
elif agent.strategy == StrategyType.GRIM:
# 冷酷策略:一旦对手背叛,永远背叛
if opponent and "defect" in opponent.history:
return "defect"
return "cooperate"
else:
return random.choice(["cooperate", "defect"])
def run_round(self) -> Dict[str, Any]:
"""运行一轮博弈"""
results = {"actions": {}, "payoffs": {}}
agent_ids = list(self.agents.keys())
if len(agent_ids) == 2:
# 双人博弈
agent1, agent2 = self.agents[agent_ids[0]], self.agents[agent_ids[1]]
action1 = self.select_action(agent1, agent2)
action2 = self.select_action(agent2, agent1)
payoff1, payoff2 = self.prisoner_dilemma_payoff(action1, action2)
agent1.payoff += payoff1
agent2.payoff += payoff2
agent1.history.append(action1)
agent2.history.append(action2)
results["actions"] = {agent1.id: action1, agent2.id: action2}
results["payoffs"] = {agent1.id: payoff1, agent2.id: payoff2}
self.history.append(results)
return results
def run_tournament(self, rounds: int = 100) -> Dict[str, Any]:
"""运行锦标赛"""
for _ in range(rounds):
self.run_round()
# 计算平均收益
avg_payoffs = {
agent_id: agent.payoff / rounds
for agent_id, agent in self.agents.items()
}
return {
"total_payoffs": {agent_id: agent.payoff for agent_id, agent in self.agents.items()},
"avg_payoffs": avg_payoffs,
"winner": max(avg_payoffs, key=avg_payoffs.get)
}
class CoalitionFormation:
"""
联盟形成
支持:
1. 联盟形成算法
2. 收益分配
3. 稳定性分析
4. Shapley 值计算
"""
def __init__(self):
self.coalitions: List[Coalition] = []
self.characteristic_function: Dict[Tuple[str], float] = {}
def set_characteristic_function(self, coalition_members: Tuple[str], value: float):
"""设置特征函数"""
self.characteristic_function[coalition_members] = value
def calculate_shapley_value(self, players: List[str]) -> Dict[str, float]:
"""计算 Shapley 值"""
n = len(players)
shapley_values = {player: 0.0 for player in players}
# 遍历所有排列
from itertools import permutations
for perm in permutations(players):
# 对每个排列,计算每个玩家的边际贡献
coalition = set()
for i, player in enumerate(perm):
# 计算加入前后的价值差
coalition_before = tuple(sorted(coalition))
coalition_after = tuple(sorted(coalition | {player}))
value_before = self.characteristic_function.get(coalition_before, 0.0)
value_after = self.characteristic_function.get(coalition_after, 0.0)
marginal_contribution = value_after - value_before
shapley_values[player] += marginal_contribution
coalition.add(player)
# 平均所有排列的贡献
factorial_n = math.factorial(n)
shapley_values = {
player: value / factorial_n
for player, value in shapley_values.items()
}
return shapley_values
def form_coalition(self, agents: Dict[str, Agent],
coalition_members: List[str]) -> Coalition:
"""形成联盟"""
coalition = Coalition(
id=f"coalition_{len(self.coalitions)}",
members=set(coalition_members)
)
# 计算联盟价值
members_tuple = tuple(sorted(coalition_members))
coalition.value = self.characteristic_function.get(members_tuple, 0.0)
# 计算 Shapley 值分配
shapley_values = self.calculate_shapley_value(coalition_members)
coalition.allocation = shapley_values
# 计算稳定性
coalition.stability = self._calculate_stability(coalition, agents)
self.coalitions.append(coalition)
return coalition
def _calculate_stability(self, coalition: Coalition,
agents: Dict[str, Agent]) -> float:
"""计算联盟稳定性"""
# 简化:基于分配满意度
if not coalition.allocation:
return 0.0
satisfaction = []
for member in coalition.members:
allocated = coalition.allocation.get(member, 0.0)
# 假设每个成员有保留效用
reservation_utility = 1.0
satisfaction.append(allocated / reservation_utility)
return np.mean(satisfaction)
class EmergenceSimulator:
"""
涌现模拟器
支持:
1. 群体行为仿真
2. 涌现现象检测
3. 相变分析
4. 复杂度测量
"""
def __init__(self, num_agents: int = 100):
self.num_agents = num_agents
self.agents: List[Agent] = []
self.global_state: Dict[str, Any] = {}
self.emergence_metrics: Dict[str, float] = {}
def initialize_agents(self, strategy_distribution: Dict[StrategyType, float]):
"""初始化智能体群体"""
self.agents = []
for i in range(self.num_agents):
# 根据分布随机分配策略
rand = random.random()
cumulative = 0.0
selected_strategy = StrategyType.COOPERATE
for strategy, proportion in strategy_distribution.items():
cumulative += proportion
if rand <= cumulative:
selected_strategy = strategy
break
agent = Agent(id=f"agent_{i}", strategy=selected_strategy)
self.agents.append(agent)
def simulate_interaction(self, steps: int = 1000) -> List[Dict[str, Any]]:
"""仿真群体交互"""
history = []
for step in range(steps):
# 随机配对交互
random.shuffle(self.agents)
step_payoffs = []
for i in range(0, len(self.agents) - 1, 2):
agent1 = self.agents[i]
agent2 = self.agents[i + 1]
# 囚徒困境交互
game = CompetitionGame()
game.agents = {agent1.id: agent1, agent2.id: agent2}
result = game.run_round()
step_payoffs.extend(result["payoffs"].values())
# 更新适应度
for agent in self.agents:
agent.fitness = 1.0 + agent.payoff / (step + 1)
# 记录全局状态
cooperation_rate = sum(
1 for agent in self.agents
if agent.strategy in [StrategyType.COOPERATE, StrategyType.TIT_FOR_TAT, StrategyType.GENEROUS]
) / len(self.agents)
global_state = {
"step": step,
"cooperation_rate": cooperation_rate,
"avg_fitness": np.mean([agent.fitness for agent in self.agents]),
"fitness_variance": np.var([agent.fitness for agent in self.agents])
}
history.append(global_state)
self.global_state = global_state
return history
def detect_emergence(self, history: List[Dict[str, Any]]) -> Dict[str, Any]:
"""检测涌现现象"""
if not history:
return {"emergence_detected": False}
# 检测合作率相变
cooperation_rates = [state["cooperation_rate"] for state in history]
# 计算相变点
phase_transition = False
transition_point = None
for i in range(10, len(cooperation_rates)):
early_avg = np.mean(cooperation_rates[:i])
late_avg = np.mean(cooperation_rates[i:])
if abs(late_avg - early_avg) > 0.3: # 显著变化
phase_transition = True
transition_point = i
break
# 计算涌现指标
emergence_metrics = {
"emergence_detected": phase_transition,
"transition_point": transition_point,
"final_cooperation_rate": cooperation_rates[-1] if cooperation_rates else 0,
"cooperation_stability": 1.0 - np.std(cooperation_rates[-10:]) if len(cooperation_rates) >= 10 else 0,
"fitness_improvement": history[-1]["avg_fitness"] / history[0]["avg_fitness"] if history else 1.0
}
self.emergence_metrics = emergence_metrics
return emergence_metrics
# 使用示例
if __name__ == "__main__":
print("=== 多智能体竞争、合作与涌现行为 ===\n")
print("=== 创建竞争博弈系统 ===")
# 创建不同策略的智能体
agents = [
Agent(id="always_defect", strategy=StrategyType.DEFECT),
Agent(id="always_cooperate", strategy=StrategyType.COOPERATE),
Agent(id="tit_for_tat", strategy=StrategyType.TIT_FOR_TAT),
Agent(id="generous", strategy=StrategyType.GENEROUS),
Agent(id="grim", strategy=StrategyType.GRIM),
]
print(f"创建{len(agents)}个智能体,不同策略:")
for agent in agents:
print(f" {agent.id}: {agent.strategy.value}")
print(f"\n=== 运行囚徒困境锦标赛 ===")
# 创建博弈
game = CompetitionGame(GameType.NON_COOPERATIVE)
for agent in agents:
game.add_agent(agent)
# 运行锦标赛
tournament_result = game.run_tournament(rounds=100)
print(f"\n锦标赛结果 (100 轮):")
print(f"总收益:")
for agent_id, payoff in tournament_result["total_payoffs"].items():
print(f" {agent_id}: {payoff:.2f}")
print(f"\n平均收益:")
for agent_id, payoff in tournament_result["avg_payoffs"].items():
print(f" {agent_id}: {payoff:.2f}")
print(f"\n获胜者:{tournament_result['winner']}")
print(f"\n关键观察:")
print("1. 以牙还牙 (TFT) 通常在长期竞争中表现优异")
print("2. 纯背叛策略短期获利但长期受损")
print("3. 纯合作策略容易被利用")
print("4. 宽容策略在某些环境下表现良好")
print("5. 冷酷策略一旦遇到背叛就永远背叛")
print(f"\n=== 联盟形成 ===")
# 创建联盟形成系统
coalition_system = CoalitionFormation()
# 设置特征函数 (简化)
coalition_system.set_characteristic_function((), 0.0)
coalition_system.set_characteristic_function(("A",), 5.0)
coalition_system.set_characteristic_function(("B",), 5.0)
coalition_system.set_characteristic_function(("C",), 5.0)
coalition_system.set_characteristic_function(("A", "B"), 12.0)
coalition_system.set_characteristic_function(("A", "C"), 12.0)
coalition_system.set_characteristic_function(("B", "C"), 12.0)
coalition_system.set_characteristic_function(("A", "B", "C"), 20.0)
# 形成大联盟
players = ["A", "B", "C"]
grand_coalition = coalition_system.form_coalition({}, players)
print(f"\n大联盟形成:")
print(f" 联盟价值:{grand_coalition.value}")
print(f" Shapley 值分配:")
for player, value in grand_coalition.allocation.items():
print(f" {player}: {value:.2f}")
print(f" 稳定性:{grand_coalition.stability:.2f}")
print(f"\n=== 涌现行为仿真 ===")
# 创建涌现模拟器
simulator = EmergenceSimulator(num_agents=100)
# 初始化群体 (混合策略)
strategy_dist = {
StrategyType.DEFECT: 0.2,
StrategyType.COOPERATE: 0.2,
StrategyType.TIT_FOR_TAT: 0.4,
StrategyType.GENEROUS: 0.2
}
simulator.initialize_agents(strategy_dist)
print(f"初始化{simulator.num_agents}个智能体,策略分布:")
for strategy, proportion in strategy_dist.items():
count = sum(1 for agent in simulator.agents if agent.strategy == strategy)
print(f" {strategy.value}: {count}个 ({proportion*100:.0f}%)")
# 运行仿真
print(f"\n运行群体交互仿真 (1000 步)...")
history = simulator.simulate_interaction(steps=1000)
# 检测涌现
emergence = simulator.detect_emergence(history)
print(f"\n涌现检测结果:")
print(f" 涌现现象:{'检测到' if emergence['emergence_detected'] else '未检测到'}")
if emergence['transition_point']:
print(f" 相变点:第{emergence['transition_point']}步")
print(f" 最终合作率:{emergence['final_cooperation_rate']*100:.1f}%")
print(f" 合作稳定性:{emergence['cooperation_stability']*100:.1f}%")
print(f" 适应度提升:{emergence['fitness_improvement']:.2f}x")
print(f"\n关键洞察:")
print("1. 竞争:优胜劣汰,驱动策略进化")
print("2. 合作:联盟形成,实现优势互补")
print("3. 涌现:群体智能,超越个体之和")
print("4. 平衡:竞合平衡,实现系统和谐")
print("5. 进化:协同进化,推动系统发展")
print("\n涌现的奇迹:竞争 + 合作 + 平衡 = 群体智能")