Agent 隐私保护与数据安全完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import math
import random
from collections import defaultdict
import hashlib
import secrets
import re
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding, hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa, padding as asym_padding
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os
class DataClassification(Enum):
"""数据分类"""
PUBLIC = "public" # 公开数据
INTERNAL = "internal" # 内部数据
CONFIDENTIAL = "confidential" # 机密数据
PII = "pii" # 个人身份信息
SENSITIVE = "sensitive" # 敏感数据
class PrivacyPrinciple(Enum):
"""隐私原则"""
PURPOSE_LIMITATION = "purpose_limitation" # 目的限制
DATA_MINIMIZATION = "data_minimization" # 数据最小化
ACCURACY = "accuracy" # 准确性
STORAGE_LIMITATION = "storage_limitation" # 存储限制
INTEGRITY_CONFIDENTIALITY = "integrity_confidentiality" # 完整性与保密性
ACCOUNTABILITY = "accountability" # 问责制
@dataclass
class DataRecord:
"""数据记录"""
id: str
data: Dict[str, Any]
classification: DataClassification
owner: str
purpose: str
created_at: datetime = field(default_factory=datetime.now)
expires_at: Optional[datetime] = None
consent_given: bool = False
encrypted: bool = False
@dataclass
class PrivacyPolicy:
"""隐私策略"""
id: str
name: str
principles: List[PrivacyPrinciple]
data_types: List[DataClassification]
retention_days: int
consent_required: bool
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class ConsentRecord:
"""同意记录"""
id: str
data_subject_id: str
policy_id: str
consent_given: bool
timestamp: datetime = field(default_factory=datetime.now)
withdrawal_timestamp: Optional[datetime] = None
class EncryptionManager:
"""
加密管理器
支持:
1. AES-256 对称加密
2. RSA 非对称加密
3. 密钥派生
4. 密钥轮换
"""
def __init__(self):
self.keys: Dict[str, bytes] = {}
self.key_metadata: Dict[str, Dict[str, Any]] = {}
def generate_symmetric_key(self, key_id: str) -> bytes:
"""生成对称密钥 (AES-256)"""
key = secrets.token_bytes(32) # 256 bits
self.keys[key_id] = key
self.key_metadata[key_id] = {
"type": "symmetric",
"algorithm": "AES-256",
"created_at": datetime.now(),
"last_rotated": datetime.now(),
"status": "active"
}
return key
def generate_asymmetric_keypair(self, key_id: str) -> Tuple[rsa.RSAPublicKey, rsa.RSAPrivateKey]:
"""生成非对称密钥对 (RSA-2048)"""
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
self.keys[f"{key_id}_private"] = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
self.keys[f"{key_id}_public"] = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
self.key_metadata[key_id] = {
"type": "asymmetric",
"algorithm": "RSA-2048",
"created_at": datetime.now(),
"status": "active"
}
return public_key, private_key
def encrypt_aes(self, plaintext: bytes, key_id: str) -> bytes:
"""AES-256 加密"""
if key_id not in self.keys:
raise ValueError(f"Key {key_id} not found")
key = self.keys[key_id]
iv = secrets.token_bytes(16) # 128 bits
# PKCS7 填充
padder = padding.PKCS7(128).padder()
padded_data = padder.update(plaintext) + padder.finalize()
# AES-CBC 加密
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
# 返回 IV + 密文
return iv + ciphertext
def decrypt_aes(self, ciphertext: bytes, key_id: str) -> bytes:
"""AES-256 解密"""
if key_id not in self.keys:
raise ValueError(f"Key {key_id} not found")
key = self.keys[key_id]
iv = ciphertext[:16]
actual_ciphertext = ciphertext[16:]
# AES-CBC 解密
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_plaintext = decryptor.update(actual_ciphertext) + decryptor.finalize()
# 移除 PKCS7 填充
unpadder = padding.PKCS7(128).unpadder()
plaintext = unpadder.update(padded_plaintext) + unpadder.finalize()
return plaintext
def encrypt_rsa(self, plaintext: bytes, public_key_id: str) -> bytes:
"""RSA 加密"""
if f"{public_key_id}_public" not in self.keys:
raise ValueError(f"Public key {public_key_id} not found")
public_key_pem = self.keys[f"{public_key_id}_public"]
public_key = serialization.load_pem_public_key(public_key_pem, backend=default_backend())
ciphertext = public_key.encrypt(
plaintext,
asym_padding.OAEP(
mgf=asym_padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return ciphertext
def derive_key(self, password: str, salt: bytes, key_id: str) -> bytes:
"""从密码派生密钥 (PBKDF2)"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
key = kdf.derive(password.encode())
self.keys[key_id] = key
self.key_metadata[key_id] = {
"type": "derived",
"algorithm": "PBKDF2-SHA256",
"created_at": datetime.now(),
"iterations": 100000,
"status": "active"
}
return key
def rotate_key(self, key_id: str) -> bytes:
"""轮换密钥"""
if key_id not in self.keys:
raise ValueError(f"Key {key_id} not found")
# 生成新密钥
new_key = secrets.token_bytes(32)
old_key = self.keys[key_id]
# 保存旧密钥用于解密历史数据
old_key_id = f"{key_id}_old_{datetime.now().strftime('%Y%m%d%H%M%S')}"
self.keys[old_key_id] = old_key
self.key_metadata[old_key_id] = {
**self.key_metadata[key_id],
"status": "deprecated",
"replaced_at": datetime.now()
}
# 更新密钥
self.keys[key_id] = new_key
self.key_metadata[key_id]["last_rotated"] = datetime.now()
return new_key
class DifferentialPrivacy:
"""
差分隐私
支持:
1. Laplace 机制
2. Gaussian 机制
3. 隐私预算管理
4. 组合定理
"""
def __init__(self, epsilon: float = 1.0, delta: float = 1e-5):
self.epsilon = epsilon # 隐私预算
self.delta = delta # 松弛参数
self.budget_tracker: Dict[str, float] = defaultdict(float)
def laplace_mechanism(self, value: float, sensitivity: float = 1.0) -> float:
"""Laplace 机制添加噪声"""
scale = sensitivity / self.epsilon
noise = np.random.laplace(0, scale)
return value + noise
def gaussian_mechanism(self, value: float, sensitivity: float = 1.0) -> float:
"""Gaussian 机制添加噪声"""
# 计算 sigma
sigma = sensitivity * math.sqrt(2 * math.log(1.25 / self.delta)) / self.epsilon
noise = np.random.normal(0, sigma)
return value + noise
def privatize_count(self, count: int, sensitivity: float = 1.0) -> int:
"""私有化计数查询"""
noisy_count = self.laplace_mechanism(count, sensitivity)
return max(0, int(round(noisy_count)))
def privatize_sum(self, values: List[float],
clip_bounds: Tuple[float, float]) -> float:
"""私有化求和查询"""
# 裁剪
clipped_values = [max(clip_bounds[0], min(clip_bounds[1], v)) for v in values]
sensitivity = clip_bounds[1] - clip_bounds[0]
true_sum = sum(clipped_values)
noisy_sum = self.laplace_mechanism(true_sum, sensitivity)
return noisy_sum
def privatize_mean(self, values: List[float],
clip_bounds: Tuple[float, float]) -> float:
"""私有化均值查询"""
n = len(values)
if n == 0:
return 0.0
# 私有化求和
private_sum = self.privatize_sum(values, clip_bounds)
# 私有化计数
private_count = self.privatize_count(n, sensitivity=1.0)
if private_count == 0:
return 0.0
return private_sum / private_count
def track_budget(self, query_id: str, epsilon_spent: float):
"""跟踪隐私预算"""
self.budget_tracker[query_id] += epsilon_spent
def get_remaining_budget(self, query_id: str, total_budget: float) -> float:
"""获取剩余隐私预算"""
spent = self.budget_tracker.get(query_id, 0.0)
return max(0.0, total_budget - spent)
class FederatedLearning:
"""
联邦学习
支持:
1. 联邦平均 (FedAvg)
2. 安全聚合
3. 客户端选择
4. 模型更新
"""
def __init__(self, num_clients: int = 10):
self.num_clients = num_clients
self.global_model: Optional[np.ndarray] = None
self.client_models: Dict[int, np.ndarray] = {}
self.client_data_sizes: Dict[int, int] = {}
def initialize_global_model(self, model_shape: Tuple):
"""初始化全局模型"""
self.global_model = np.random.randn(*model_shape)
# 初始化客户端模型
for client_id in range(self.num_clients):
self.client_models[client_id] = self.global_model.copy()
self.client_data_sizes[client_id] = 0
def local_training(self, client_id: int, local_data: np.ndarray,
local_labels: np.ndarray, epochs: int = 1) -> np.ndarray:
"""客户端本地训练 (简化 SGD)"""
if client_id not in self.client_models:
raise ValueError(f"Client {client_id} not found")
model = self.client_models[client_id]
learning_rate = 0.01
# 记录数据量
self.client_data_sizes[client_id] = len(local_data)
# 简化训练:梯度下降
for epoch in range(epochs):
# 前向传播 (简化线性模型)
predictions = np.dot(local_data, model)
# 计算梯度
errors = predictions - local_labels
gradient = np.dot(local_data.T, errors) / len(local_data)
# 更新模型
model = model - learning_rate * gradient
self.client_models[client_id] = model
return model
def secure_aggregation(self, selected_clients: List[int]) -> np.ndarray:
"""安全聚合 (FedAvg)"""
if not selected_clients:
raise ValueError("No clients selected")
total_samples = sum(self.client_data_sizes[c] for c in selected_clients)
if total_samples == 0:
return self.global_model
# 加权平均
aggregated_model = np.zeros_like(self.global_model)
for client_id in selected_clients:
weight = self.client_data_sizes[client_id] / total_samples
aggregated_model += weight * self.client_models[client_id]
self.global_model = aggregated_model
return aggregated_model
def select_clients(self, fraction: float = 0.1) -> List[int]:
"""随机选择客户端"""
num_selected = max(1, int(self.num_clients * fraction))
return random.sample(range(self.num_clients), num_selected)
def federated_round(self, local_data_dict: Dict[int, Tuple[np.ndarray, np.ndarray]],
epochs: int = 1) -> np.ndarray:
"""执行一轮联邦学习"""
# 选择客户端
selected_clients = self.select_clients(fraction=0.1)
# 本地训练
for client_id in selected_clients:
if client_id in local_data_dict:
data, labels = local_data_dict[client_id]
self.local_training(client_id, data, labels, epochs)
# 安全聚合
updated_global_model = self.secure_aggregation(selected_clients)
# 更新所有客户端模型
for client_id in range(self.num_clients):
self.client_models[client_id] = updated_global_model.copy()
return updated_global_model
class PrivacyCompliance:
"""
隐私合规
支持:
1. GDPR 合规检查
2. 同意管理
3. 数据主体权利
4. 合规审计
"""
def __init__(self):
self.policies: Dict[str, PrivacyPolicy] = {}
self.consent_records: Dict[str, ConsentRecord] = {}
self.data_records: Dict[str, DataRecord] = {}
self.audit_logs: List[Dict[str, Any]] = []
def create_policy(self, name: str, principles: List[PrivacyPrinciple],
data_types: List[DataClassification],
retention_days: int,
consent_required: bool = True) -> PrivacyPolicy:
"""创建隐私策略"""
policy_id = f"policy_{secrets.token_hex(8)}"
policy = PrivacyPolicy(
id=policy_id,
name=name,
principles=principles,
data_types=data_types,
retention_days=retention_days,
consent_required=consent_required
)
self.policies[policy_id] = policy
return policy
def record_consent(self, data_subject_id: str, policy_id: str,
consent_given: bool) -> ConsentRecord:
"""记录同意"""
consent_id = f"consent_{secrets.token_hex(16)}"
consent = ConsentRecord(
id=consent_id,
data_subject_id=data_subject_id,
policy_id=policy_id,
consent_given=consent_given
)
self.consent_records[consent_id] = consent
# 审计日志
self.audit_logs.append({
"timestamp": datetime.now().isoformat(),
"action": "consent_recorded",
"data_subject_id": data_subject_id,
"policy_id": policy_id,
"consent_given": consent_given
})
return consent
def withdraw_consent(self, consent_id: str):
"""撤回同意"""
if consent_id not in self.consent_records:
raise ValueError(f"Consent {consent_id} not found")
consent = self.consent_records[consent_id]
consent.withdrawal_timestamp = datetime.now()
consent.consent_given = False
# 审计日志
self.audit_logs.append({
"timestamp": datetime.now().isoformat(),
"action": "consent_withdrawn",
"consent_id": consent_id
})
def check_gdpr_compliance(self, data_record: DataRecord) -> Tuple[bool, List[str]]:
"""检查 GDPR 合规性"""
violations = []
# 检查同意
if not data_record.consent_given:
violations.append("Consent not given")
# 检查存储限制
if data_record.expires_at and datetime.now() > data_record.expires_at:
violations.append("Data retention period exceeded")
# 检查目的限制 (简化)
if not data_record.purpose:
violations.append("Purpose not specified")
# 检查数据分类
if data_record.classification in [DataClassification.PII, DataClassification.SENSITIVE]:
if not data_record.encrypted:
violations.append("Sensitive data not encrypted")
is_compliant = len(violations) == 0
return is_compliant, violations
def exercise_right_to_erasure(self, data_subject_id: str):
"""行使删除权 (被遗忘权)"""
# 查找所有相关数据记录
records_to_delete = [
record_id for record_id, record in self.data_records.items()
if record.owner == data_subject_id
]
# 删除数据记录
for record_id in records_to_delete:
del self.data_records[record_id]
# 审计日志
self.audit_logs.append({
"timestamp": datetime.now().isoformat(),
"action": "right_to_erasure",
"data_subject_id": data_subject_id,
"records_deleted": len(records_to_delete)
})
return len(records_to_delete)
def generate_compliance_report(self) -> Dict[str, Any]:
"""生成合规报告"""
total_records = len(self.data_records)
total_consents = len(self.consent_records)
active_consents = sum(1 for c in self.consent_records.values()
if c.consent_given and not c.withdrawal_timestamp)
# GDPR 合规检查
gdpr_violations = []
for record in self.data_records.values():
is_compliant, violations = self.check_gdpr_compliance(record)
if not is_compliant:
gdpr_violations.extend(violations)
return {
"total_data_records": total_records,
"total_consents": total_consents,
"active_consents": active_consents,
"consent_rate": active_consents / total_consents if total_consents > 0 else 0,
"gdpr_violations": len(gdpr_violations),
"violation_details": gdpr_violations[:10], # 前 10 个违规
"audit_log_entries": len(self.audit_logs),
"report_timestamp": datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
print("=== Agent 隐私保护与数据安全技术 ===\n")
print("=== 创建加密管理器 ===")
# 创建加密管理器
encrypt_mgr = EncryptionManager()
# 生成对称密钥
key_id = "data_encryption_key"
symmetric_key = encrypt_mgr.generate_symmetric_key(key_id)
print(f"生成对称密钥 (AES-256): {key_id}")
print(f"密钥长度:{len(symmetric_key) * 8} bits")
# 生成非对称密钥对
keypair_id = "rsa_keypair"
public_key, private_key = encrypt_mgr.generate_asymmetric_keypair(keypair_id)
print(f"\n生成非对称密钥对 (RSA-2048): {keypair_id}")
print(f"\n=== 测试 AES 加密 ===")
# 测试 AES 加密
plaintext = b"Sensitive personal data: John Doe, SSN: 123-45-6789"
ciphertext = encrypt_mgr.encrypt_aes(plaintext, key_id)
decrypted = encrypt_mgr.decrypt_aes(ciphertext, key_id)
print(f"明文:{plaintext.decode()}")
print(f"密文长度:{len(ciphertext)} bytes")
print(f"解密:{decrypted.decode()}")
print(f"加密解密一致:{plaintext == decrypted}")
print(f"\n=== 测试 RSA 加密 ===")
# 测试 RSA 加密
small_plaintext = b"Secret key"
rsa_ciphertext = encrypt_mgr.encrypt_rsa(small_plaintext, keypair_id)
print(f"明文:{small_plaintext.decode()}")
print(f"RSA 密文长度:{len(rsa_ciphertext)} bytes")
print(f"\n=== 差分隐私 ===")
# 创建差分隐私
dp = DifferentialPrivacy(epsilon=1.0, delta=1e-5)
# 测试 Laplace 机制
true_value = 100.0
noisy_value = dp.laplace_mechanism(true_value, sensitivity=1.0)
print(f"真实值:{true_value}")
print(f"加噪值 (Laplace): {noisy_value:.2f}")
print(f"噪声大小:{abs(noisy_value - true_value):.2f}")
# 测试私有化计数
true_count = 1000
private_count = dp.privatize_count(true_count)
print(f"\n真实计数:{true_count}")
print(f"私有化计数:{private_count}")
# 测试私有化均值
values = [random.uniform(0, 100) for _ in range(100)]
true_mean = np.mean(values)
private_mean = dp.privatize_mean(values, clip_bounds=(0, 100))
print(f"\n真实均值:{true_mean:.2f}")
print(f"私有化均值:{private_mean:.2f}")
print(f"误差:{abs(private_mean - true_mean):.2f}")
print(f"\n=== 联邦学习 ===")
# 创建联邦学习
fl = FederatedLearning(num_clients=10)
# 初始化全局模型
fl.initialize_global_model(model_shape=(10,))
print(f"初始化全局模型:{fl.global_model.shape}")
# 生成模拟数据
local_data_dict = {}
for client_id in range(10):
data = np.random.randn(100, 10)
labels = np.random.randn(100)
local_data_dict[client_id] = (data, labels)
# 执行联邦学习轮次
print(f"\n执行联邦学习...")
for round_num in range(3):
updated_model = fl.federated_round(local_data_dict, epochs=1)
print(f"Round {round_num + 1}: 模型更新完成,全局模型均值:{np.mean(updated_model):.4f}")
print(f"\n=== 隐私合规 ===")
# 创建隐私合规管理器
compliance = PrivacyCompliance()
# 创建隐私策略
policy = compliance.create_policy(
name="GDPR Compliance Policy",
principles=[
PrivacyPrinciple.PURPOSE_LIMITATION,
PrivacyPrinciple.DATA_MINIMIZATION,
PrivacyPrinciple.STORAGE_LIMITATION,
PrivacyPrinciple.INTEGRITY_CONFIDENTIALITY
],
data_types=[DataClassification.PII, DataClassification.SENSITIVE],
retention_days=365,
consent_required=True
)
print(f"创建隐私策略:{policy.name}")
# 记录同意
consent = compliance.record_consent(
data_subject_id="user_123",
policy_id=policy.id,
consent_given=True
)
print(f"记录同意:{consent.data_subject_id} -> {consent.consent_given}")
# 创建数据记录
data_record = DataRecord(
id="record_1",
data={"name": "John Doe", "email": "john@example.com"},
classification=DataClassification.PII,
owner="user_123",
purpose="Service provision",
expires_at=datetime.now() + timedelta(days=365),
consent_given=True,
encrypted=True
)
compliance.data_records[data_record.id] = data_record
# 检查 GDPR 合规性
is_compliant, violations = compliance.check_gdpr_compliance(data_record)
print(f"\nGDPR 合规检查:{'通过' if is_compliant else '失败'}")
if violations:
print(f"违规项:{violations}")
# 生成合规报告
report = compliance.generate_compliance_report()
print(f"\n合规报告:")
print(f" 总数据记录:{report['total_data_records']}")
print(f" 活跃同意:{report['active_consents']}")
print(f" 同意率:{report['consent_rate']:.2%}")
print(f" GDPR 违规:{report['gdpr_violations']}")
print(f" 审计日志:{report['audit_log_entries']} 条")
print(f"\n关键观察:")
print("1. 隐私保护:隐私设计、数据最小化、目的限制")
print("2. 数据加密:AES-256、RSA-2048、密钥管理")
print("3. 隐私增强:差分隐私、联邦学习、安全多方计算")
print("4. 合规管理:GDPR、同意管理、数据主体权利")
print("5. 隐私优先 AI:隐私 + 加密 + PETs + 合规 = 可信赖")
print("\n隐私保护的使命:让 AI 在保护隐私的前提下释放数据价值")