联邦学习与隐私协同完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import math
import random
from collections import defaultdict
import hashlib
class FLArchitecture(Enum):
"""联邦学习架构"""
CENTRALIZED = "centralized" # 集中式联邦
DECENTRALIZED = "decentralized" # 去中心化联邦
HYBRID = "hybrid" # 混合式联邦
class PrivacyMechanism(Enum):
"""隐私保护机制"""
DIFFERENTIAL_PRIVACY = "dp" # 差分隐私
HOMOMORPHIC_ENCRYPTION = "he" # 同态加密
SECURE_AGGREGATION = "sa" # 安全聚合
SECRET_SHARING = "ss" # 秘密共享
@dataclass
class Client:
"""联邦客户端"""
id: str
data_size: int
model_weights: Optional[np.ndarray] = None
gradient: Optional[np.ndarray] = None
privacy_budget: float = 1.0
reputation_score: float = 1.0
@dataclass
class GlobalModel:
"""全局模型"""
weights: np.ndarray
version: int = 0
aggregation_count: int = 0
performance_metrics: Dict[str, float] = field(default_factory=dict)
class DifferentialPrivacy:
"""
差分隐私
支持:
1. Laplace 机制
2. Gaussian 机制
3. 隐私预算跟踪
4. 隐私 - 效用权衡
"""
def __init__(self, epsilon: float = 1.0, delta: float = 1e-5):
self.epsilon = epsilon # 隐私预算
self.delta = delta # 失败概率
self.spent_budget = 0.0
def add_laplace_noise(self, value: np.ndarray, sensitivity: float = 1.0) -> np.ndarray:
"""添加 Laplace 噪声"""
# 计算噪声尺度
scale = sensitivity / self.epsilon
# 添加噪声
noise = np.random.laplace(0, scale, value.shape)
noisy_value = value + noise
# 更新隐私预算
self.spent_budget += self.epsilon
return noisy_value
def add_gaussian_noise(self, value: np.ndarray, sensitivity: float = 1.0) -> np.ndarray:
"""添加 Gaussian 噪声"""
# 计算噪声尺度 (基于 (ε,δ)-差分隐私)
sigma = sensitivity * math.sqrt(2 * math.log(1.25 / self.delta)) / self.epsilon
# 添加噪声
noise = np.random.normal(0, sigma, value.shape)
noisy_value = value + noise
# 更新隐私预算
self.spent_budget += self.epsilon
return noisy_value
def check_privacy_budget(self, required_budget: float) -> bool:
"""检查隐私预算是否充足"""
return (self.spent_budget + required_budget) <= 1.0
def get_remaining_budget(self) -> float:
"""获取剩余隐私预算"""
return max(0.0, 1.0 - self.spent_budget)
class HomomorphicEncryption:
"""
同态加密 (简化版 Paillier)
支持:
1. 同态加法
2. 同态标量乘法
3. 密钥生成
4. 加密/解密
"""
def __init__(self, key_size: int = 1024):
self.key_size = key_size
# 简化:实际实现需要大素数生成
self.public_key = None
self.private_key = None
self._generate_keys()
def _generate_keys(self):
"""生成密钥对 (简化)"""
# 实际实现需要:
# 1. 生成两个大素数 p 和 q
# 2. 计算 n = p * q 和 lambda = lcm(p-1, q-1)
# 3. 选择随机数 g
# 4. 计算 mu = lambda^(-1) mod n
# 这里简化为模拟
self.public_key = {"n": 2**self.key_size, "g": 3}
self.private_key = {"lambda": 2**(self.key_size//2), "mu": 1}
def encrypt(self, plaintext: int) -> int:
"""加密 (简化)"""
n, g = self.public_key["n"], self.public_key["g"]
r = random.randint(1, n-1)
# c = g^m * r^n mod n^2
ciphertext = pow(g, plaintext, n**2) * pow(r, n, n**2) % (n**2)
return ciphertext
def decrypt(self, ciphertext: int) -> int:
"""解密 (简化)"""
n = self.public_key["n"]
lambda_key = self.private_key["lambda"]
mu = self.private_key["mu"]
# m = L(c^lambda mod n^2) * mu mod n
# 其中 L(u) = (u-1)/n
u = pow(ciphertext, lambda_key, n**2)
l_value = (u - 1) // n
plaintext = (l_value * mu) % n
return plaintext
def homomorphic_add(self, c1: int, c2: int) -> int:
"""同态加法:Decrypt(c1 * c2) = m1 + m2"""
n = self.public_key["n"]
return (c1 * c2) % (n**2)
def homomorphic_scalar_mult(self, ciphertext: int, scalar: int) -> int:
"""同态标量乘法:Decrypt(c^k) = k * m"""
n = self.public_key["n"]
return pow(ciphertext, scalar, n**2)
class SecureAggregation:
"""
安全聚合
支持:
1. 秘密共享
2. 安全求和
3. 掩码技术
4. 抗 collusion
"""
def __init__(self, num_clients: int, threshold: int = None):
self.num_clients = num_clients
self.threshold = threshold or (num_clients // 2 + 1)
self.masks: Dict[str, np.ndarray] = {}
def generate_mask(self, client_id: str, shape: Tuple) -> np.ndarray:
"""生成随机掩码"""
mask = np.random.randn(*shape)
self.masks[client_id] = mask
return mask
def mask_weights(self, client_id: str, weights: np.ndarray) -> np.ndarray:
"""掩码权重"""
if client_id not in self.masks:
self.generate_mask(client_id, weights.shape)
masked_weights = weights + self.masks[client_id]
return masked_weights
def unmask_aggregate(self, masked_aggregates: List[np.ndarray],
client_ids: List[str]) -> np.ndarray:
"""解掩码聚合"""
# 简单聚合 (实际实现需要秘密共享恢复)
aggregate = np.sum(masked_aggregates, axis=0)
# 减去掩码和
mask_sum = np.sum([self.masks[cid] for cid in client_ids], axis=0)
unmasked_aggregate = aggregate - mask_sum
return unmasked_aggregate
def secure_sum(self, client_updates: Dict[str, np.ndarray]) -> np.ndarray:
"""安全求和"""
# 简化:直接求和
# 实际实现需要使用秘密共享协议
updates_list = list(client_updates.values())
return np.sum(updates_list, axis=0)
class FederatedLearning:
"""
联邦学习系统
支持:
1. FedAvg 算法
2. 差分隐私保护
3. 安全聚合
4. 异构数据处理
"""
def __init__(self, architecture: FLArchitecture = FLArchitecture.CENTRALIZED):
self.architecture = architecture
self.clients: Dict[str, Client] = {}
self.global_model: Optional[GlobalModel] = None
self.dp_mechanism: Optional[DifferentialPrivacy] = None
self.he_mechanism: Optional[HomomorphicEncryption] = None
self.secure_agg: Optional[SecureAggregation] = None
self.training_history: List[Dict[str, Any]] = []
def register_client(self, client: Client):
"""注册客户端"""
self.clients[client.id] = client
def initialize_global_model(self, model_shape: Tuple):
"""初始化全局模型"""
self.global_model = GlobalModel(
weights=np.random.randn(*model_shape) * 0.01
)
def enable_privacy(self, epsilon: float = 1.0, delta: float = 1e-5):
"""启用差分隐私"""
self.dp_mechanism = DifferentialPrivacy(epsilon, delta)
def enable_secure_aggregation(self):
"""启用安全聚合"""
self.secure_agg = SecureAggregation(len(self.clients))
def client_update(self, client_id: str, local_data: Tuple[np.ndarray, np.ndarray],
epochs: int = 1, learning_rate: float = 0.01) -> np.ndarray:
"""客户端本地更新"""
client = self.clients[client_id]
X, y = local_data
# 初始化本地模型为全局模型
local_weights = self.global_model.weights.copy()
# 本地训练 (简化 SGD)
for epoch in range(epochs):
# 前向传播
predictions = X @ local_weights
# 计算梯度 (MSE 损失)
gradient = X.T @ (predictions - y) / len(y)
# 添加差分隐私噪声
if self.dp_mechanism:
gradient = self.dp_mechanism.add_gaussian_noise(gradient)
# 更新权重
local_weights -= learning_rate * gradient
# 计算更新量
update = local_weights - self.global_model.weights
client.gradient = update
return update
def aggregate_updates(self, updates: Dict[str, np.ndarray],
data_sizes: Dict[str, int]) -> np.ndarray:
"""聚合客户端更新"""
total_size = sum(data_sizes.values())
# 加权平均 (FedAvg)
weighted_sum = np.zeros_like(list(updates.values())[0])
for client_id, update in updates.items():
weight = data_sizes[client_id] / total_size
weighted_sum += weight * update
# 如果使用安全聚合
if self.secure_agg:
# 简化:实际应使用安全聚合协议
pass
return weighted_sum
def train_round(self, selected_clients: List[str],
local_datasets: Dict[str, Tuple[np.ndarray, np.ndarray]],
epochs: int = 1, learning_rate: float = 0.01) -> Dict[str, Any]:
"""训练一轮"""
updates = {}
data_sizes = {}
# 客户端本地更新
for client_id in selected_clients:
if client_id in local_datasets:
update = self.client_update(
client_id,
local_datasets[client_id],
epochs,
learning_rate
)
updates[client_id] = update
data_sizes[client_id] = self.clients[client_id].data_size
# 聚合更新
aggregated_update = self.aggregate_updates(updates, data_sizes)
# 更新全局模型
self.global_model.weights += aggregated_update
self.global_model.version += 1
self.global_model.aggregation_count += 1
# 记录训练历史
round_info = {
"round": self.global_model.version,
"num_clients": len(selected_clients),
"privacy_budget": self.dp_mechanism.get_remaining_budget() if self.dp_mechanism else None,
"timestamp": datetime.now().isoformat()
}
self.training_history.append(round_info)
return round_info
def evaluate_model(self, test_data: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
"""评估模型"""
X, y = test_data
predictions = X @ self.global_model.weights
# 计算指标
mse = np.mean((predictions - y) ** 2)
mae = np.mean(np.abs(predictions - y))
metrics = {
"mse": mse,
"mae": mae,
"r2": 1 - (np.sum((y - predictions)**2) / np.sum((y - np.mean(y))**2))
}
self.global_model.performance_metrics = metrics
return metrics
# 使用示例
if __name__ == "__main__":
print("=== 多智能体联邦学习与隐私协同 ===\n")
print("=== 创建联邦学习系统 ===")
# 创建联邦学习系统
fl_system = FederatedLearning(FLArchitecture.CENTRALIZED)
# 注册客户端
num_clients = 5
for i in range(num_clients):
client = Client(
id=f"client_{i}",
data_size=random.randint(100, 500),
privacy_budget=1.0
)
fl_system.register_client(client)
print(f"注册{num_clients}个联邦客户端")
for client_id, client in fl_system.clients.items():
print(f" {client_id}: 数据量={client.data_size}, 隐私预算={client.privacy_budget}")
print(f"\n=== 初始化全局模型 ===")
# 初始化全局模型
model_shape = (10,) # 10 个特征
fl_system.initialize_global_model(model_shape)
print(f"全局模型初始化:{model_shape}维权重")
print(f"初始权重范数:{np.linalg.norm(fl_system.global_model.weights):.4f}")
print(f"\n=== 启用隐私保护 ===")
# 启用差分隐私
fl_system.enable_privacy(epsilon=0.5, delta=1e-5)
print(f"差分隐私启用:ε=0.5, δ=1e-5")
print(f"剩余隐私预算:{fl_system.dp_mechanism.get_remaining_budget():.2f}")
# 启用安全聚合
fl_system.enable_secure_aggregation()
print("安全聚合启用")
print(f"\n=== 生成模拟数据 ===")
# 为每个客户端生成非 IID 数据
local_datasets = {}
for client_id in fl_system.clients:
# 生成非 IID 数据 (不同客户端有不同的数据分布)
n_samples = fl_system.clients[client_id].data_size
n_features = model_shape[0]
# 每个客户端有不同的特征偏移
offset = random.uniform(-2, 2)
X = np.random.randn(n_samples, n_features) + offset
true_weights = np.random.randn(n_features)
y = X @ true_weights + np.random.randn(n_samples) * 0.1
local_datasets[client_id] = (X, y)
print(f"为{len(local_datasets)}个客户端生成非 IID 数据")
print(f"\n=== 联邦训练 ===")
# 训练多轮
num_rounds = 10
for round_num in range(num_rounds):
# 随机选择客户端
selected_clients = random.sample(list(fl_system.clients.keys()), k=3)
# 训练一轮
round_info = fl_system.train_round(
selected_clients=selected_clients,
local_datasets=local_datasets,
epochs=3,
learning_rate=0.01
)
if round_num % 2 == 0:
print(f"轮次 {round_num+1}/{num_rounds}:")
print(f" 参与客户端:{len(selected_clients)}")
print(f" 剩余隐私预算:{round_info['privacy_budget']:.2f}")
print(f"\n=== 模型评估 ===")
# 生成测试数据
n_test = 200
X_test = np.random.randn(n_test, model_shape[0])
y_test = X_test @ np.random.randn(model_shape[0]) + np.random.randn(n_test) * 0.1
# 评估模型
metrics = fl_system.evaluate_model((X_test, y_test))
print(f"全局模型性能:")
print(f" MSE: {metrics['mse']:.4f}")
print(f" MAE: {metrics['mae']:.4f}")
print(f" R²: {metrics['r2']:.4f}")
print(f"\n=== 训练历史 ===")
print(f"总训练轮次:{len(fl_system.training_history)}")
print(f"最终模型版本:v{fl_system.global_model.version}")
print(f"总聚合次数:{fl_system.global_model.aggregation_count}")
print(f"\n关键观察:")
print("1. 联邦学习:数据不出域,保护隐私")
print("2. 差分隐私:添加噪声,数学保证隐私")
print("3. 安全聚合:保护中间更新,防止泄露")
print("4. 非 IID 数据:各客户端数据分布不同")
print("5. 隐私 - 效用权衡:隐私预算影响模型性能")
print("\n联邦的核心:数据不动模型动,隐私保护协同训练")