Agent 对抗样本攻击与防御完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import math
import random
from collections import defaultdict
import hashlib
import secrets
class AttackType(Enum):
"""攻击类型"""
FGSM = "fgsm" # 快速梯度符号法
PGD = "pgd" # 投影梯度下降
BIM = "bim" # 基本迭代法
CW = "cw" # Carlini-Wagner 攻击
BLACK_BOX = "black_box" # 黑盒攻击
PHYSICAL = "physical" # 物理世界攻击
class DefenseType(Enum):
"""防御类型"""
ADVERSARIAL_TRAINING = "adversarial_training" # 对抗训练
CERTIFIED_DEFENSE = "certified_defense" # 认证防御
INPUT_PREPROCESSING = "input_preprocessing" # 输入预处理
MODEL_ARCHITECTURE = "model_architecture" # 模型架构防御
DETECTION = "detection" # 检测防御
class ThreatModel(Enum):
"""威胁模型"""
WHITE_BOX = "white_box" # 白盒(完全了解模型)
BLACK_BOX = "black_box" # 黑盒(仅查询输出)
GREY_BOX = "grey_box" # 灰盒(部分了解)
class AttackGoal(Enum):
"""攻击目标"""
UNTARGETED = "untargeted" # 非定向(任意错误)
TARGETED = "targeted" # 定向(特定错误类别)
@dataclass
class AdversarialExample:
"""对抗样本"""
original_input: np.ndarray
adversarial_input: np.ndarray
perturbation: np.ndarray
true_label: int
predicted_label: int
adversarial_label: int
attack_type: AttackType
epsilon: float # 扰动幅度
success: bool
confidence: float
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class DefenseResult:
"""防御结果"""
defense_type: DefenseType
original_accuracy: float
robust_accuracy: float
improvement: float
defense_time: float
parameters: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
class NeuralNetworkMock:
"""
神经网络模拟(用于演示)
支持:
1. 前向传播
2. 梯度计算
3. 准确率评估
"""
def __init__(self, input_dim: int = 784, num_classes: int = 10):
self.input_dim = input_dim
self.num_classes = num_classes
# 简化模型:线性层
self.weights = np.random.randn(input_dim, num_classes) * 0.01
self.bias = np.zeros(num_classes)
self.is_training = True
def forward(self, x: np.ndarray) -> np.ndarray:
"""前向传播"""
logits = np.dot(x, self.weights) + self.bias
return logits
def predict(self, x: np.ndarray) -> np.ndarray:
"""预测类别"""
logits = self.forward(x)
return np.argmax(logits, axis=1)
def compute_loss(self, x: np.ndarray, y: np.ndarray) -> float:
"""计算交叉熵损失"""
logits = self.forward(x)
# Softmax
exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
# 交叉熵损失
n_samples = x.shape[0]
log_probs = -np.log(probs[np.arange(n_samples), y] + 1e-10)
return np.mean(log_probs)
def compute_gradient(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
"""计算输入梯度(用于攻击)"""
logits = self.forward(x)
# Softmax
exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
# 梯度
n_samples = x.shape[0]
grad = probs.copy()
grad[np.arange(n_samples), y] -= 1
# 输入梯度
input_grad = np.dot(grad, self.weights.T)
return input_grad
def train_step(self, x: np.ndarray, y: np.ndarray, lr: float = 0.01):
"""训练一步"""
logits = self.forward(x)
# Softmax
exp_logits = np.exp(logits - np.max(logits, axis=1, keepdims=True))
probs = exp_logits / np.sum(exp_logits, axis=1, keepdims=True)
# 梯度
n_samples = x.shape[0]
grad = probs.copy()
grad[np.arange(n_samples), y] -= 1
grad = grad / n_samples
# 更新权重
weight_grad = np.dot(x.T, grad)
bias_grad = np.sum(grad, axis=0)
self.weights -= lr * weight_grad
self.bias -= lr * bias_grad
def evaluate(self, x: np.ndarray, y: np.ndarray) -> float:
"""评估准确率"""
predictions = self.predict(x)
accuracy = np.mean(predictions == y)
return accuracy
class AdversarialAttacker:
"""
对抗攻击器
支持:
1. FGSM 攻击
2. PGD 攻击
3. BIM 攻击
4. 黑盒攻击
"""
def __init__(self, model: NeuralNetworkMock):
self.model = model
def fgsm_attack(self, x: np.ndarray, y: np.ndarray,
epsilon: float = 0.01) -> AdversarialExample:
"""FGSM (Fast Gradient Sign Method) 攻击"""
# 计算梯度
gradient = self.model.compute_gradient(x, y)
# 梯度符号
grad_sign = np.sign(gradient)
# 生成对抗样本
x_adv = x + epsilon * grad_sign
# 裁剪到 [0, 1]
x_adv = np.clip(x_adv, 0, 1)
# 计算扰动
perturbation = x_adv - x
# 评估攻击效果
orig_pred = self.model.predict(x)[0]
adv_pred = self.model.predict(x_adv)[0]
success = orig_pred != adv_pred
# 计算置信度
logits = self.model.forward(x_adv)
probs = np.exp(logits - np.max(logits)) / np.sum(np.exp(logits - np.max(logits)))
confidence = probs[0, adv_pred]
return AdversarialExample(
original_input=x.copy(),
adversarial_input=x_adv,
perturbation=perturbation,
true_label=y[0],
predicted_label=orig_pred,
adversarial_label=adv_pred,
attack_type=AttackType.FGSM,
epsilon=epsilon,
success=success,
confidence=confidence
)
def pgd_attack(self, x: np.ndarray, y: np.ndarray,
epsilon: float = 0.01, steps: int = 10,
step_size: float = None) -> AdversarialExample:
"""PGD (Projected Gradient Descent) 攻击"""
if step_size is None:
step_size = epsilon / steps
x_adv = x.copy()
for step in range(steps):
# 计算梯度
gradient = self.model.compute_gradient(x_adv, y)
# 梯度上升
x_adv = x_adv + step_size * np.sign(gradient)
# 投影到 epsilon 球内
perturbation = x_adv - x
perturbation = np.clip(perturbation, -epsilon, epsilon)
x_adv = x + perturbation
# 裁剪到 [0, 1]
x_adv = np.clip(x_adv, 0, 1)
# 计算最终扰动
perturbation = x_adv - x
# 评估攻击效果
orig_pred = self.model.predict(x)[0]
adv_pred = self.model.predict(x_adv)[0]
success = orig_pred != adv_pred
# 计算置信度
logits = self.model.forward(x_adv)
probs = np.exp(logits - np.max(logits)) / np.sum(np.exp(logits - np.max(logits)))
confidence = probs[0, adv_pred]
return AdversarialExample(
original_input=x.copy(),
adversarial_input=x_adv,
perturbation=perturbation,
true_label=y[0],
predicted_label=orig_pred,
adversarial_label=adv_pred,
attack_type=AttackType.PGD,
epsilon=epsilon,
success=success,
confidence=confidence
)
def bim_attack(self, x: np.ndarray, y: np.ndarray,
epsilon: float = 0.01, steps: int = 10,
step_size: float = None) -> AdversarialExample:
"""BIM (Basic Iterative Method) 攻击"""
# BIM 类似于 PGD,但不使用随机初始化
return self.pgd_attack(x, y, epsilon, steps, step_size)
def black_box_attack(self, x: np.ndarray, y: np.ndarray,
substitute_model: NeuralNetworkMock,
epsilon: float = 0.01) -> AdversarialExample:
"""黑盒攻击(使用替代模型)"""
# 在替代模型上生成对抗样本
attacker = AdversarialAttacker(substitute_model)
adv_example = attacker.fgsm_attack(x, y, epsilon)
# 在目标模型上测试迁移性
adv_pred = self.model.predict(adv_example.adversarial_input)[0]
adv_example.success = adv_example.predicted_label != adv_pred
adv_example.attack_type = AttackType.BLACK_BOX
return adv_example
class AdversarialDefender:
"""
对抗防御器
支持:
1. 对抗训练
2. 输入预处理
3. 检测防御
4. 鲁棒性评估
"""
def __init__(self, model: NeuralNetworkMock):
self.model = model
self.attacker = AdversarialAttacker(model)
self.defense_history: List[DefenseResult] = []
def adversarial_training(self, train_x: np.ndarray, train_y: np.ndarray,
epochs: int = 10, epsilon: float = 0.01,
lr: float = 0.01) -> DefenseResult:
"""对抗训练"""
original_accuracy = self.model.evaluate(train_x, train_y)
for epoch in range(epochs):
# 生成对抗样本
adversarial_examples = []
for i in range(len(train_x)):
x_sample = train_x[i:i+1]
y_sample = train_y[i:i+1]
adv_example = self.attacker.pgd_attack(x_sample, y_sample, epsilon, steps=5)
adversarial_examples.append(adv_example.adversarial_input)
# 合并原始数据和对抗数据
x_adv = np.vstack(adversarial_examples)
x_combined = np.vstack([train_x, x_adv])
y_combined = np.hstack([train_y, train_y])
# 训练
indices = np.random.permutation(len(x_combined))
for idx in indices:
self.model.train_step(x_combined[idx:idx+1],
y_combined[idx:idx+1], lr)
# 评估鲁棒准确率
robust_accuracy = self.evaluate_robustness(train_x, train_y, epsilon)
improvement = robust_accuracy - original_accuracy
result = DefenseResult(
defense_type=DefenseType.ADVERSARIAL_TRAINING,
original_accuracy=original_accuracy,
robust_accuracy=robust_accuracy,
improvement=improvement,
defense_time=0.0, # 简化
parameters={"epochs": epochs, "epsilon": epsilon, "lr": lr}
)
self.defense_history.append(result)
return result
def input_preprocessing(self, x: np.ndarray,
method: str = "smoothing") -> np.ndarray:
"""输入预处理防御"""
if method == "smoothing":
# 高斯平滑
kernel_size = 3
padded = np.pad(x, ((0,0), (1,1), (1,1), (0,0)), mode='edge')
smoothed = np.zeros_like(x)
for i in range(x.shape[0]):
for j in range(1, x.shape[1]+1):
for k in range(1, x.shape[2]+1):
region = padded[i, j-1:j+2, k-1:k+2, :]
smoothed[i] += np.mean(region, axis=(0,1))
return smoothed
elif method == "bit_reduction":
# 位深减少
bits = 4
max_val = (1 << bits) - 1
scale = 255 / max_val
x_reduced = np.round(x / scale).astype(np.float32) * scale
return x_reduced
else:
return x
def detect_adversarial(self, x: np.ndarray,
threshold: float = 0.5) -> Tuple[bool, float]:
"""检测对抗样本(基于梯度范数)"""
# 简化检测:计算输入梯度范数
dummy_y = np.array([0]) # 虚拟标签
gradient = self.model.compute_gradient(x, dummy_y)
grad_norm = np.linalg.norm(gradient)
# 如果梯度范数超过阈值,可能是对抗样本
is_adversarial = grad_norm > threshold
confidence = min(1.0, grad_norm / threshold)
return is_adversarial, confidence
def evaluate_robustness(self, test_x: np.ndarray, test_y: np.ndarray,
epsilon: float = 0.01,
attack_type: AttackType = AttackType.PGD) -> float:
"""评估鲁棒准确率"""
success_count = 0
for i in range(len(test_x)):
x_sample = test_x[i:i+1]
y_sample = test_y[i:i+1]
# 生成对抗样本
if attack_type == AttackType.FGSM:
adv_example = self.attacker.fgsm_attack(x_sample, y_sample, epsilon)
else:
adv_example = self.attacker.pgd_attack(x_sample, y_sample, epsilon, steps=10)
# 检查是否攻击成功
if not adv_example.success:
success_count += 1
robust_accuracy = success_count / len(test_x)
return robust_accuracy
def generate_defense_report(self) -> Dict[str, Any]:
"""生成防御报告"""
if not self.defense_history:
return {"message": "No defense results available"}
latest = self.defense_history[-1]
return {
"defense_type": latest.defense_type.value,
"original_accuracy": f"{latest.original_accuracy:.2%}",
"robust_accuracy": f"{latest.robust_accuracy:.2%}",
"improvement": f"{latest.improvement:+.2%}",
"parameters": latest.parameters,
"total_defenses": len(self.defense_history),
"report_timestamp": datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
print("=== Agent 对抗样本攻击与防御机制 ===\n")
print("=== 创建神经网络模型 ===")
# 创建模型
model = NeuralNetworkMock(input_dim=784, num_classes=10)
print(f"模型输入维度:{model.input_dim}")
print(f"模型类别数:{model.num_classes}")
# 生成模拟数据
np.random.seed(42)
n_samples = 100
test_x = np.random.rand(n_samples, 784).astype(np.float32)
test_y = np.random.randint(0, 10, n_samples)
print(f"\n测试数据:{n_samples} 个样本")
# 评估原始准确率
original_accuracy = model.evaluate(test_x, test_y)
print(f"原始准确率:{original_accuracy:.2%}")
print(f"\n=== FGSM 攻击 ===")
# 创建攻击器
attacker = AdversarialAttacker(model)
# FGSM 攻击
x_sample = test_x[0:1]
y_sample = test_y[0:1]
fgsm_result = attacker.fgsm_attack(x_sample, y_sample, epsilon=0.01)
print(f"FGSM 攻击 (ε=0.01):")
print(f" 原始预测:{fgsm_result.predicted_label}")
print(f" 对抗预测:{fgsm_result.adversarial_label}")
print(f" 真实标签:{fgsm_result.true_label}")
print(f" 攻击成功:{fgsm_result.success}")
print(f" 置信度:{fgsm_result.confidence:.2%}")
print(f" 扰动幅度:{np.max(np.abs(fgsm_result.perturbation)):.4f}")
print(f"\n=== PGD 攻击 ===")
# PGD 攻击
pgd_result = attacker.pgd_attack(x_sample, y_sample, epsilon=0.01, steps=10)
print(f"PGD 攻击 (ε=0.01, steps=10):")
print(f" 原始预测:{pgd_result.predicted_label}")
print(f" 对抗预测:{pgd_result.adversarial_label}")
print(f" 攻击成功:{pgd_result.success}")
print(f" 置信度:{pgd_result.confidence:.2%}")
print(f"\n=== 对抗训练防御 ===")
# 创建防御器
defender = AdversarialDefender(model)
# 生成训练数据
train_x = np.random.rand(500, 784).astype(np.float32)
train_y = np.random.randint(0, 10, 500)
# 对抗训练
defense_result = defender.adversarial_training(
train_x, train_y,
epochs=3,
epsilon=0.01,
lr=0.01
)
print(f"对抗训练结果:")
print(f" 防御类型:{defense_result.defense_type.value}")
print(f" 原始准确率:{defense_result.original_accuracy:.2%}")
print(f" 鲁棒准确率:{defense_result.robust_accuracy:.2%}")
print(f" 提升:{defense_result.improvement:+.2%}")
print(f"\n=== 鲁棒性评估 ===")
# 评估不同 epsilon 下的鲁棒性
epsilons = [0.005, 0.01, 0.02, 0.03]
print("鲁棒准确率 vs Epsilon:")
for eps in epsilons:
robust_acc = defender.evaluate_robustness(test_x, test_y, epsilon=eps)
print(f" ε={eps:.3f}: {robust_acc:.2%}")
print(f"\n=== 对抗样本检测 ===")
# 检测对抗样本
is_adv, confidence = defender.detect_adversarial(fgsm_result.adversarial_input, threshold=0.5)
print(f"对抗样本检测:")
print(f" 是否对抗样本:{is_adv}")
print(f" 检测置信度:{confidence:.2%}")
# 检测正常样本
is_adv_normal, conf_normal = defender.detect_adversarial(x_sample, threshold=0.5)
print(f"\n正常样本检测:")
print(f" 是否对抗样本:{is_adv_normal}")
print(f" 检测置信度:{conf_normal:.2%}")
print(f"\n=== 防御报告 ===")
# 生成防御报告
report = defender.generate_defense_report()
print(f"防御报告:")
for key, value in report.items():
print(f" {key}: {value}")
print(f"\n关键观察:")
print("1. 对抗攻击:FGSM/PGD 能有效欺骗模型")
print("2. 对抗训练:显著提升鲁棒性")
print("3. 鲁棒性评估:epsilon 越大,鲁棒准确率越低")
print("4. 检测防御:能识别部分对抗样本")
print("5. 鲁棒 AI:攻击 + 防御 + 检测 = 可信赖")
print("\n鲁棒 AI 的使命:让 AI 系统在对抗攻击下依然可靠运行")