Agent 可解释性与决策链路溯源完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import math
import random
from collections import defaultdict
import hashlib
import secrets
from scipy.special import softmax
import copy
class ExplanationType(Enum):
"""解释类型"""
GLOBAL = "global" # 全局解释
LOCAL = "local" # 局部解释
FEATURE_ATTRIBUTION = "feature_attribution" # 特征归因
COUNTERFACTUAL = "counterfactual" # 反事实解释
EXAMPLE_BASED = "example_based" # 基于示例
class ModelType(Enum):
"""模型类型"""
LINEAR = "linear" # 线性模型
TREE = "tree" # 树模型
NEURAL_NETWORK = "neural_network" # 神经网络
ENSEMBLE = "ensemble" # 集成模型
@dataclass
class FeatureImportance:
"""特征重要性"""
feature_name: str
importance: float
direction: str # positive/negative
rank: int
@dataclass
class LocalExplanation:
"""局部解释"""
instance_id: str
prediction: int
probability: float
feature_importances: List[FeatureImportance]
explanation_type: ExplanationType
confidence: float
generated_at: datetime = field(default_factory=datetime.now)
@dataclass
class DecisionTrace:
"""决策链路"""
trace_id: str
input_data: Dict[str, Any]
prediction: int
decision_path: List[Dict[str, Any]]
key_factors: List[str]
counterfactuals: List[Dict[str, Any]]
uncertainty: float
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class CausalGraph:
"""因果图"""
nodes: List[str]
edges: List[Tuple[str, str, float]] # (from, to, strength)
confounders: List[str]
class InterpretableModel:
"""
可解释模型
支持:
1. 线性模型( inherently interpretable)
2. 决策树(可视化决策路径)
3. 特征重要性计算
"""
def __init__(self, model_type: ModelType = ModelType.LINEAR):
self.model_type = model_type
self.weights: Optional[np.ndarray] = None
self.intercept: float = 0.0
self.feature_names: List[str] = []
self.is_fitted = False
def fit(self, X: np.ndarray, y: np.ndarray,
feature_names: List[str] = None):
"""训练模型"""
n_samples, n_features = X.shape
if feature_names:
self.feature_names = feature_names
else:
self.feature_names = [f"feature_{i}" for i in range(n_features)]
if self.model_type == ModelType.LINEAR:
# 简化线性回归:正规方程
X_bias = np.column_stack([np.ones(n_samples), X])
theta = np.linalg.lstsq(X_bias, y, rcond=None)[0]
self.intercept = theta[0]
self.weights = theta[1:]
self.is_fitted = True
def predict(self, X: np.ndarray) -> np.ndarray:
"""预测"""
if not self.is_fitted:
raise ValueError("Model not fitted")
if self.model_type == ModelType.LINEAR:
predictions = np.dot(X, self.weights) + self.intercept
return (predictions > 0).astype(int)
return np.zeros(X.shape[0])
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""预测概率"""
if not self.is_fitted:
raise ValueError("Model not fitted")
if self.model_type == ModelType.LINEAR:
logits = np.dot(X, self.weights) + self.intercept
probs = 1 / (1 + np.exp(-logits))
return np.column_stack([1 - probs, probs])
return np.zeros((X.shape[0], 2))
def get_feature_importance(self) -> List[FeatureImportance]:
"""获取特征重要性"""
if not self.is_fitted or self.weights is None:
return []
# 按绝对值排序
abs_weights = np.abs(self.weights)
sorted_indices = np.argsort(abs_weights)[::-1]
importances = []
for rank, idx in enumerate(sorted_indices):
importance = abs_weights[idx]
direction = "positive" if self.weights[idx] > 0 else "negative"
importances.append(FeatureImportance(
feature_name=self.feature_names[idx],
importance=importance,
direction=direction,
rank=rank + 1
))
return importances
def explain_prediction(self, x: np.ndarray,
prediction: int) -> LocalExplanation:
"""解释单个预测"""
if not self.is_fitted or self.weights is None:
raise ValueError("Model not fitted")
# 计算特征贡献
contributions = x * self.weights
abs_contributions = np.abs(contributions)
# 排序
sorted_indices = np.argsort(abs_contributions)[::-1]
feature_importances = []
for rank, idx in enumerate(sorted_indices[:10]): # Top 10
importance = abs_contributions[idx]
direction = "positive" if contributions[idx] > 0 else "negative"
feature_importances.append(FeatureImportance(
feature_name=self.feature_names[idx],
importance=importance,
direction=direction,
rank=rank + 1
))
# 计算置信度
proba = self.predict_proba(x.reshape(1, -1))[0]
confidence = np.max(proba)
return LocalExplanation(
instance_id=f"instance_{secrets.token_hex(8)}",
prediction=prediction,
probability=confidence,
feature_importances=feature_importances,
explanation_type=ExplanationType.FEATURE_ATTRIBUTION,
confidence=confidence
)
class SHAPExplainer:
"""
SHAP (SHapley Additive exPlanations) 解释器
支持:
1. Shapley 值计算
2. 特征归因
3. 全局和局部解释
"""
def __init__(self, model, background_data: np.ndarray):
self.model = model
self.background_data = background_data
self.n_background = len(background_data)
def _compute_shapley_value(self, x: np.ndarray,
feature_idx: int,
n_samples: int = 100) -> float:
"""计算单个特征的 Shapley 值(简化版)"""
n_features = len(x)
shapley_value = 0.0
for _ in range(n_samples):
# 随机选择特征子集
subset_size = random.randint(0, n_features - 1)
subset = random.sample([i for i in range(n_features) if i != feature_idx],
subset_size)
# 创建两个样本:有特征 vs 无特征
x_with = x.copy()
x_without = x.copy()
# 用背景数据填充未选中的特征
background_idx = random.randint(0, self.n_background - 1)
for i in range(n_features):
if i not in subset and i != feature_idx:
x_with[i] = self.background_data[background_idx, i]
x_without[i] = self.background_data[background_idx, i]
x_without[feature_idx] = self.background_data[background_idx, feature_idx]
# 计算边际贡献
pred_with = self.model.predict_proba(x_with.reshape(1, -1))[0, 1]
pred_without = self.model.predict_proba(x_without.reshape(1, -1))[0, 1]
marginal_contribution = pred_with - pred_without
shapley_value += marginal_contribution
return shapley_value / n_samples
def explain(self, x: np.ndarray) -> Dict[str, float]:
"""解释单个样本"""
n_features = len(x)
shapley_values = {}
for i in range(n_features):
feature_name = f"feature_{i}"
shapley_value = self._compute_shapley_value(x, i)
shapley_values[feature_name] = shapley_value
return shapley_values
def explain_instance(self, x: np.ndarray,
prediction: int) -> LocalExplanation:
"""生成局部解释"""
shapley_values = self.explain(x)
# 转换为 FeatureImportance 列表
feature_importances = []
sorted_features = sorted(shapley_values.items(),
key=lambda item: abs(item[1]),
reverse=True)
for rank, (feature_name, importance) in enumerate(sorted_features[:10]):
direction = "positive" if importance > 0 else "negative"
feature_importances.append(FeatureImportance(
feature_name=feature_name,
importance=abs(importance),
direction=direction,
rank=rank + 1
))
# 计算置信度
proba = self.model.predict_proba(x.reshape(1, -1))[0]
confidence = np.max(proba)
return LocalExplanation(
instance_id=f"shap_{secrets.token_hex(8)}",
prediction=prediction,
probability=confidence,
feature_importances=feature_importances,
explanation_type=ExplanationType.FEATURE_ATTRIBUTION,
confidence=confidence
)
class DecisionTracer:
"""
决策链路追踪器
支持:
1. 决策路径记录
2. 因果图构建
3. 反事实生成
"""
def __init__(self, model):
self.model = model
self.traces: List[DecisionTrace] = []
self.causal_graph: Optional[CausalGraph] = None
def trace_decision(self, x: np.ndarray,
feature_names: List[str] = None) -> DecisionTrace:
"""追踪决策链路"""
if feature_names is None:
feature_names = [f"feature_{i}" for i in range(len(x))]
# 记录决策路径
decision_path = []
# 获取预测
prediction = self.model.predict(x.reshape(1, -1))[0]
proba = self.model.predict_proba(x.reshape(1, -1))[0]
# 如果是可解释模型,获取解释
if isinstance(self.model, InterpretableModel):
explanation = self.model.explain_prediction(x, prediction)
for fi in explanation.feature_importances:
decision_path.append({
"feature": fi.feature_name,
"importance": fi.importance,
"direction": fi.direction,
"rank": fi.rank
})
# 识别关键因素
key_factors = [fp["feature"] for fp in decision_path[:5]]
# 生成反事实
counterfactuals = self._generate_counterfactuals(x, prediction, feature_names)
# 计算不确定性
uncertainty = 1.0 - np.max(proba)
trace = DecisionTrace(
trace_id=f"trace_{secrets.token_hex(16)}",
input_data={name: float(x[i]) for i, name in enumerate(feature_names)},
prediction=int(prediction),
decision_path=decision_path,
key_factors=key_factors,
counterfactuals=counterfactuals,
uncertainty=float(uncertainty)
)
self.traces.append(trace)
return trace
def _generate_counterfactuals(self, x: np.ndarray,
prediction: int,
feature_names: List[str],
n_counterfactuals: int = 3) -> List[Dict[str, Any]]:
"""生成反事实解释"""
counterfactuals = []
for _ in range(n_counterfactuals):
# 随机扰动特征
x_cf = x.copy()
feature_idx = random.randint(0, len(x) - 1)
# 尝试改变特征值
change_direction = random.choice([-1, 1])
change_magnitude = random.uniform(0.1, 0.5)
x_cf[feature_idx] += change_direction * change_magnitude
x_cf = np.clip(x_cf, 0, 1) # 假设特征在 [0, 1] 范围
# 检查预测是否改变
cf_prediction = self.model.predict(x_cf.reshape(1, -1))[0]
if cf_prediction != prediction:
counterfactuals.append({
"original_value": float(x[feature_idx]),
"counterfactual_value": float(x_cf[feature_idx]),
"feature": feature_names[feature_idx],
"change": change_direction * change_magnitude,
"new_prediction": int(cf_prediction)
})
return counterfactuals
def build_causal_graph(self, feature_names: List[str]) -> CausalGraph:
"""构建简化因果图"""
nodes = feature_names + ["prediction"]
edges = []
# 简化:假设所有特征都直接影响预测
for i, feature in enumerate(feature_names):
if isinstance(self.model, InterpretableModel) and self.model.weights is not None:
strength = abs(self.model.weights[i])
edges.append((feature, "prediction", float(strength)))
self.causal_graph = CausalGraph(
nodes=nodes,
edges=edges,
confounders=[]
)
return self.causal_graph
def get_trace_summary(self) -> Dict[str, Any]:
"""获取追踪摘要"""
if not self.traces:
return {"message": "No traces available"}
avg_uncertainty = np.mean([t.uncertainty for t in self.traces])
most_common_factors = defaultdict(int)
for trace in self.traces:
for factor in trace.key_factors:
most_common_factors[factor] += 1
top_factors = sorted(most_common_factors.items(),
key=lambda x: x[1],
reverse=True)[:5]
return {
"total_traces": len(self.traces),
"average_uncertainty": float(avg_uncertainty),
"top_key_factors": top_factors,
"latest_trace_id": self.traces[-1].trace_id,
"summary_timestamp": datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
print("=== Agent 可解释性与决策链路溯源 ===\n")
print("=== 创建可解释模型 ===")
# 创建可解释模型
model = InterpretableModel(model_type=ModelType.LINEAR)
print(f"模型类型:{model.model_type.value}")
# 生成模拟数据
np.random.seed(42)
n_samples = 1000
n_features = 10
X = np.random.rand(n_samples, n_features)
# 真实权重
true_weights = np.array([0.5, -0.3, 0.8, 0.0, -0.6, 0.2, 0.0, 0.4, -0.1, 0.7])
y = (np.dot(X, true_weights) + np.random.normal(0, 0.1, n_samples) > 0).astype(int)
feature_names = [f"feature_{i}" for i in range(n_features)]
# 训练模型
model.fit(X, y, feature_names)
print(f"训练完成,样本数:{n_samples}")
# 获取特征重要性
print(f"\n=== 特征重要性 ===")
importances = model.get_feature_importance()
print("Top 5 重要特征:")
for imp in importances[:5]:
print(f" {imp.rank}. {imp.feature_name}: {imp.importance:.3f} ({imp.direction})")
print(f"\n=== SHAP 解释 ===")
# 创建 SHAP 解释器
background_data = X[:100] # 使用部分数据作为背景
shap_explainer = SHAPExplainer(model, background_data)
# 解释单个样本
test_idx = 0
x_test = X[test_idx]
y_test = y[test_idx]
shap_explanation = shap_explainer.explain_instance(x_test, y_test)
print(f"SHAP 解释 (样本 {test_idx}):")
print(f" 预测:{shap_explanation.prediction}")
print(f" 置信度:{shap_explanation.confidence:.2%}")
print(f" Top 3 特征:")
for fi in shap_explanation.feature_importances[:3]:
print(f" {fi.feature_name}: {fi.importance:.3f} ({fi.direction})")
print(f"\n=== 决策链路追踪 ===")
# 创建决策追踪器
tracer = DecisionTracer(model)
# 追踪决策
trace = tracer.trace_decision(x_test, feature_names)
print(f"决策链路追踪:")
print(f" 追踪 ID: {trace.trace_id}")
print(f" 预测:{trace.prediction}")
print(f" 不确定性:{trace.uncertainty:.2%}")
print(f" 关键因素:{', '.join(trace.key_factors)}")
if trace.counterfactuals:
print(f" 反事实解释:")
for cf in trace.counterfactuals[:2]:
print(f" {cf['feature']}: {cf['original_value']:.2f} → {cf['counterfactual_value']:.2f} (预测变为 {cf['new_prediction']})")
print(f"\n=== 因果图 ===")
# 构建因果图
causal_graph = tracer.build_causal_graph(feature_names)
print(f"因果图:")
print(f" 节点数:{len(causal_graph.nodes)}")
print(f" 边数:{len(causal_graph.edges)}")
print(f" Top 3 因果边:")
sorted_edges = sorted(causal_graph.edges, key=lambda e: e[2], reverse=True)
for edge in sorted_edges[:3]:
print(f" {edge[0]} → {edge[1]} (强度:{edge[2]:.3f})")
print(f"\n=== 追踪摘要 ===")
# 追踪多个样本
for i in range(10):
tracer.trace_decision(X[i], feature_names)
summary = tracer.get_trace_summary()
print(f"追踪摘要:")
print(f" 总追踪数:{summary['total_traces']}")
print(f" 平均不确定性:{summary['average_uncertainty']:.2%}")
print(f" Top 关键因素:")
for factor, count in summary['top_key_factors']:
print(f" {factor}: {count} 次")
print(f"\n关键观察:")
print("1. 可解释性:特征重要性、SHAP 值、局部解释")
print("2. 决策溯源:决策路径、因果图、反事实解释")
print("3. 透明度:完整决策链路、数据血缘")
print("4. 可信度:不确定性量化、置信度校准")
print("5. 透明 AI:解释 + 溯源 + 透明 + 可信 = 可信赖")
print("\n透明 AI 的使命:让 AI 决策过程透明、可理解、可信任")