Agent AI 伦理与合规审计完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import math
import random
from collections import defaultdict
import hashlib
import secrets
from scipy import stats
class EthicalPrinciple(Enum):
"""伦理原则"""
FAIRNESS = "fairness" # 公平性
TRANSPARENCY = "transparency" # 透明度
ACCOUNTABILITY = "accountability" # 问责制
PRIVACY = "privacy" # 隐私保护
SAFETY = "safety" # 安全性
HUMAN_WELLBEING = "human_wellbeing" # 人类福祉
class RiskLevel(Enum):
"""风险等级"""
MINIMAL = "minimal" # 最小风险
LIMITED = "limited" # 有限风险
HIGH = "high" # 高风险
UNACCEPTABLE = "unacceptable" # 不可接受风险
class ComplianceStatus(Enum):
"""合规状态"""
COMPLIANT = "compliant" # 合规
PARTIALLY_COMPLIANT = "partially_compliant" # 部分合规
NON_COMPLIANT = "non_compliant" # 不合规
NOT_ASSESSED = "not_assessed" # 未评估
@dataclass
class EthicalAssessment:
"""伦理评估"""
assessment_id: str
principle: EthicalPrinciple
score: float # 0-100
status: str # pass/warning/fail
findings: List[str]
recommendations: List[str]
assessed_at: datetime = field(default_factory=datetime.now)
@dataclass
class ComplianceCheck:
"""合规检查"""
check_id: str
regulation: str # EU AI Act, GDPR, etc.
requirement: str
status: ComplianceStatus
evidence: List[str]
gaps: List[str]
remediation_plan: Optional[str]
checked_at: datetime = field(default_factory=datetime.now)
@dataclass
class RiskAssessment:
"""风险评估"""
risk_id: str
risk_category: str
risk_level: RiskLevel
likelihood: float # 0-1
impact: float # 0-1
risk_score: float # likelihood * impact
mitigation_measures: List[str]
residual_risk: RiskLevel
assessed_at: datetime = field(default_factory=datetime.now)
@dataclass
class AuditReport:
"""审计报告"""
report_id: str
audit_type: str
scope: str
findings: List[Dict[str, Any]]
compliance_score: float
risk_summary: Dict[str, int]
recommendations: List[str]
audit_date: datetime
auditor: str
class AIEthicsGovernance:
"""
AI 伦理治理系统
支持:
1. 伦理原则评估
2. 合规检查
3. 风险评估
4. 审计报告生成
"""
def __init__(self, organization_name: str):
self.organization_name = organization_name
self.assessments: List[EthicalAssessment] = []
self.compliance_checks: List[ComplianceCheck] = []
self.risk_assessments: List[RiskAssessment] = []
self.audit_reports: List[AuditReport] = []
def assess_fairness(self, predictions: np.ndarray,
true_labels: np.ndarray,
sensitive_attributes: np.ndarray) -> EthicalAssessment:
"""评估公平性"""
assessment_id = f"fairness_{secrets.token_hex(8)}"
findings = []
recommendations = []
# 计算不同群体的准确率
unique_groups = np.unique(sensitive_attributes)
group_accuracies = {}
for group in unique_groups:
mask = sensitive_attributes == group
group_preds = predictions[mask]
group_true = true_labels[mask]
accuracy = np.mean(group_preds == group_true)
group_accuracies[f"group_{group}"] = accuracy
# 计算公平性指标
accuracies = list(group_accuracies.values())
max_diff = max(accuracies) - min(accuracies)
# 公平性评分 (差异越小越好)
fairness_score = max(0, 100 - max_diff * 100)
if max_diff > 0.1:
status = "fail"
findings.append(f"群体间准确率差异过大:{max_diff:.2%}")
recommendations.append("重新平衡训练数据")
recommendations.append("使用公平性约束算法")
elif max_diff > 0.05:
status = "warning"
findings.append(f"群体间准确率存在差异:{max_diff:.2%}")
recommendations.append("监控群体性能差异")
else:
status = "pass"
findings.append("群体间准确率差异在可接受范围内")
assessment = EthicalAssessment(
assessment_id=assessment_id,
principle=EthicalPrinciple.FAIRNESS,
score=fairness_score,
status=status,
findings=findings,
recommendations=recommendations
)
self.assessments.append(assessment)
return assessment
def assess_transparency(self, model_info: Dict[str, Any]) -> EthicalAssessment:
"""评估透明度"""
assessment_id = f"transparency_{secrets.token_hex(8)}"
findings = []
recommendations = []
transparency_score = 0
max_score = 100
# 检查文档完整性
if model_info.get('documentation'):
transparency_score += 25
findings.append("模型文档完整")
else:
recommendations.append("完善模型文档")
# 检查可解释性
if model_info.get('explainability'):
transparency_score += 25
findings.append("提供模型解释")
else:
recommendations.append("增加模型可解释性")
# 检查数据来源
if model_info.get('data_sources'):
transparency_score += 25
findings.append("数据来源透明")
else:
recommendations.append("公开数据来源")
# 检查限制说明
if model_info.get('limitations'):
transparency_score += 25
findings.append("说明模型限制")
else:
recommendations.append("明确模型限制")
# 确定状态
if transparency_score >= 80:
status = "pass"
elif transparency_score >= 50:
status = "warning"
else:
status = "fail"
assessment = EthicalAssessment(
assessment_id=assessment_id,
principle=EthicalPrinciple.TRANSPARENCY,
score=transparency_score,
status=status,
findings=findings,
recommendations=recommendations
)
self.assessments.append(assessment)
return assessment
def check_eu_ai_act_compliance(self,
system_info: Dict[str, Any]) -> List[ComplianceCheck]:
"""检查 EU AI Act 合规性"""
checks = []
# 风险分类检查
risk_check = ComplianceCheck(
check_id=f"eu_risk_{secrets.token_hex(8)}",
regulation="EU AI Act",
requirement="Risk Classification",
status=ComplianceStatus.COMPLIANT if system_info.get('risk_classified') else ComplianceStatus.NON_COMPLIANT,
evidence=["Risk assessment document"] if system_info.get('risk_classified') else [],
gaps=["Risk classification not performed"] if not system_info.get('risk_classified') else [],
remediation_plan="Conduct risk assessment and classify AI system" if not system_info.get('risk_classified') else None,
checked_at=datetime.now()
)
checks.append(risk_check)
# 数据治理检查
data_check = ComplianceCheck(
check_id=f"eu_data_{secrets.token_hex(8)}",
regulation="EU AI Act",
requirement="Data Governance",
status=ComplianceStatus.COMPLIANT if system_info.get('data_governance') else ComplianceStatus.PARTIALLY_COMPLIANT,
evidence=["Data governance policy"] if system_info.get('data_governance') else [],
gaps=["Incomplete data governance"] if not system_info.get('data_governance') else [],
remediation_plan="Implement comprehensive data governance",
checked_at=datetime.now()
)
checks.append(data_check)
# 透明度检查
transparency_check = ComplianceCheck(
check_id=f"eu_transparency_{secrets.token_hex(8)}",
regulation="EU AI Act",
requirement="Transparency",
status=ComplianceStatus.COMPLIANT if system_info.get('transparency') else ComplianceStatus.NON_COMPLIANT,
evidence=["User information provided"] if system_info.get('transparency') else [],
gaps=["Transparency requirements not met"] if not system_info.get('transparency') else [],
remediation_plan="Provide clear information to users",
checked_at=datetime.now()
)
checks.append(transparency_check)
self.compliance_checks.extend(checks)
return checks
def assess_risk(self, risk_data: Dict[str, Any]) -> RiskAssessment:
"""评估风险"""
risk_id = f"risk_{secrets.token_hex(8)}"
likelihood = risk_data.get('likelihood', 0.5)
impact = risk_data.get('impact', 0.5)
risk_score = likelihood * impact
# 确定风险等级
if risk_score >= 0.7:
risk_level = RiskLevel.UNACCEPTABLE
elif risk_score >= 0.5:
risk_level = RiskLevel.HIGH
elif risk_score >= 0.3:
risk_level = RiskLevel.LIMITED
else:
risk_level = RiskLevel.MINIMAL
# 缓解措施
mitigation_measures = risk_data.get('mitigation_measures', [])
# 计算剩余风险
mitigation_effectiveness = len(mitigation_measures) * 0.15
residual_score = risk_score * (1 - mitigation_effectiveness)
if residual_score >= 0.7:
residual_risk = RiskLevel.UNACCEPTABLE
elif residual_score >= 0.5:
residual_risk = RiskLevel.HIGH
elif residual_score >= 0.3:
residual_risk = RiskLevel.LIMITED
else:
residual_risk = RiskLevel.MINIMAL
assessment = RiskAssessment(
risk_id=risk_id,
risk_category=risk_data.get('category', 'general'),
risk_level=risk_level,
likelihood=likelihood,
impact=impact,
risk_score=risk_score,
mitigation_measures=mitigation_measures,
residual_risk=residual_risk,
assessed_at=datetime.now()
)
self.risk_assessments.append(assessment)
return assessment
def generate_audit_report(self,
audit_type: str,
scope: str,
auditor: str) -> AuditReport:
"""生成审计报告"""
report_id = f"audit_{secrets.token_hex(16)}"
# 汇总发现
findings = []
# 伦理评估发现
for assessment in self.assessments:
findings.append({
"type": "ethical_assessment",
"principle": assessment.principle.value,
"score": assessment.score,
"status": assessment.status,
"findings": assessment.findings
})
# 合规检查发现
for check in self.compliance_checks:
findings.append({
"type": "compliance_check",
"regulation": check.regulation,
"requirement": check.requirement,
"status": check.status.value,
"gaps": check.gaps
})
# 风险评估发现
for risk in self.risk_assessments:
findings.append({
"type": "risk_assessment",
"category": risk.risk_category,
"level": risk.risk_level.value,
"score": risk.risk_score
})
# 计算合规分数
total_checks = len(self.compliance_checks)
compliant_checks = sum(1 for c in self.compliance_checks
if c.status == ComplianceStatus.COMPLIANT)
compliance_score = (compliant_checks / total_checks * 100) if total_checks > 0 else 0
# 风险汇总
risk_summary = {
"unacceptable": sum(1 for r in self.risk_assessments
if r.risk_level == RiskLevel.UNACCEPTABLE),
"high": sum(1 for r in self.risk_assessments
if r.risk_level == RiskLevel.HIGH),
"limited": sum(1 for r in self.risk_assessments
if r.risk_level == RiskLevel.LIMITED),
"minimal": sum(1 for r in self.risk_assessments
if r.risk_level == RiskLevel.MINIMAL)
}
# 生成建议
recommendations = []
for assessment in self.assessments:
recommendations.extend(assessment.recommendations)
for check in self.compliance_checks:
if check.remediation_plan:
recommendations.append(check.remediation_plan)
report = AuditReport(
report_id=report_id,
audit_type=audit_type,
scope=scope,
findings=findings,
compliance_score=compliance_score,
risk_summary=risk_summary,
recommendations=list(set(recommendations)),
audit_date=datetime.now(),
auditor=auditor
)
self.audit_reports.append(report)
return report
def get_governance_summary(self) -> Dict[str, Any]:
"""获取治理摘要"""
return {
"organization": self.organization_name,
"total_assessments": len(self.assessments),
"total_compliance_checks": len(self.compliance_checks),
"total_risk_assessments": len(self.risk_assessments),
"total_audit_reports": len(self.audit_reports),
"average_compliance_score": np.mean([r.compliance_score for r in self.audit_reports]) if self.audit_reports else 0,
"summary_timestamp": datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
print("=== Agent AI 伦理边界与合规审计体系 ===\n")
print("=== 创建 AI 伦理治理系统 ===")
# 创建治理系统
governance = AIEthicsGovernance("TechCorp AI Division")
print(f"组织:{governance.organization_name}")
print(f"\n=== 公平性评估 ===")
# 模拟数据
np.random.seed(42)
n_samples = 1000
# 生成预测和真实标签
predictions = np.random.randint(0, 2, n_samples)
true_labels = np.random.randint(0, 2, n_samples)
# 生成敏感属性 (0 和 1 代表两个群体)
sensitive_attrs = np.random.randint(0, 2, n_samples)
# 评估公平性
fairness_assessment = governance.assess_fairness(
predictions, true_labels, sensitive_attrs
)
print(f"公平性评估:")
print(f" 评估 ID: {fairness_assessment.assessment_id}")
print(f" 伦理原则:{fairness_assessment.principle.value}")
print(f" 得分:{fairness_assessment.score:.1f}/100")
print(f" 状态:{fairness_assessment.status}")
print(f" 发现:")
for finding in fairness_assessment.findings:
print(f" - {finding}")
if fairness_assessment.recommendations:
print(f" 建议:")
for rec in fairness_assessment.recommendations:
print(f" - {rec}")
print(f"\n=== 透明度评估 ===")
# 模型信息
model_info = {
'documentation': True,
'explainability': True,
'data_sources': True,
'limitations': False
}
# 评估透明度
transparency_assessment = governance.assess_transparency(model_info)
print(f"透明度评估:")
print(f" 得分:{transparency_assessment.score:.1f}/100")
print(f" 状态:{transparency_assessment.status}")
print(f" 发现:")
for finding in transparency_assessment.findings:
print(f" - {finding}")
if transparency_assessment.recommendations:
print(f" 建议:")
for rec in transparency_assessment.recommendations:
print(f" - {rec}")
print(f"\n=== EU AI Act 合规检查 ===")
# 系统信息
system_info = {
'risk_classified': True,
'data_governance': True,
'transparency': False
}
# 检查合规性
compliance_checks = governance.check_eu_ai_act_compliance(system_info)
print(f"EU AI Act 合规检查:")
for check in compliance_checks:
print(f" {check.requirement}:")
print(f" 状态:{check.status.value}")
if check.gaps:
print(f" 差距:{', '.join(check.gaps)}")
if check.remediation_plan:
print(f" 整改计划:{check.remediation_plan}")
print(f"\n=== 风险评估 ===")
# 风险数据
risk_data = {
'category': 'bias_discrimination',
'likelihood': 0.6,
'impact': 0.8,
'mitigation_measures': [
'公平性约束',
'多样化数据',
'持续监控'
]
}
# 评估风险
risk_assessment = governance.assess_risk(risk_data)
print(f"风险评估:")
print(f" 风险 ID: {risk_assessment.risk_id}")
print(f" 类别:{risk_assessment.risk_category}")
print(f" 初始等级:{risk_assessment.risk_level.value}")
print(f" 风险分数:{risk_assessment.risk_score:.2f}")
print(f" 缓解措施:{len(risk_assessment.mitigation_measures)} 项")
print(f" 剩余风险:{risk_assessment.residual_risk.value}")
print(f"\n=== 生成审计报告 ===")
# 生成审计报告
audit_report = governance.generate_audit_report(
audit_type="Annual AI Ethics Audit",
scope="AI System Development and Deployment",
auditor="Dr. Jane Smith, Chief Ethics Officer"
)
print(f"审计报告:")
print(f" 报告 ID: {audit_report.report_id}")
print(f" 审计类型:{audit_report.audit_type}")
print(f" 范围:{audit_report.scope}")
print(f" 合规分数:{audit_report.compliance_score:.1f}/100")
print(f" 风险汇总:")
for level, count in audit_report.risk_summary.items():
print(f" {level}: {count}")
print(f" 建议数量:{len(audit_report.recommendations)}")
if audit_report.recommendations[:3]:
print(f" Top 建议:")
for rec in audit_report.recommendations[:3]:
print(f" - {rec}")
print(f"\n=== 治理摘要 ===")
summary = governance.get_governance_summary()
print(f"治理摘要:")
for key, value in summary.items():
print(f" {key}: {value}")
print(f"\n关键观察:")
print("1. AI 伦理:公平性、透明度、问责制、隐私保护")
print("2. 合规框架:EU AI Act、GDPR、全球标准")
print("3. 治理体系:架构、政策、风险、组织")
print("4. 审计方法:评估、检查、监控、报告")
print("5. 负责任 AI:伦理 + 合规 + 治理 + 审计 = 可信赖")
print("\n负责任 AI 的使命:让 AI 系统在伦理约束下、合规保障中、治理框架内、审计监督下可信赖地服务人类")