Agent 权限最小化与访问控制完整实现
import numpy as np
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import math
import random
from collections import defaultdict
import hashlib
import secrets
import re
class PermissionType(Enum):
"""权限类型"""
READ = "read" # 读取
WRITE = "write" # 写入
EXECUTE = "execute" # 执行
DELETE = "delete" # 删除
ADMIN = "admin" # 管理
NETWORK = "network" # 网络访问
STORAGE = "storage" # 存储访问
API_CALL = "api_call" # API 调用
class AccessDecision(Enum):
"""访问决策"""
ALLOW = "allow" # 允许
DENY = "deny" # 拒绝
CONDITIONAL = "conditional" # 条件允许
class AuthMethod(Enum):
"""认证方法"""
PASSWORD = "password" # 密码
TOKEN = "token" # 令牌
BIOMETRIC = "biometric" # 生物识别
HARDWARE_KEY = "hardware_key" # 硬件密钥
OTP = "otp" # 一次性密码
@dataclass
class Permission:
"""权限定义"""
id: str
name: str
permission_type: PermissionType
resource: str
scope: str # 作用域
description: str = ""
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class Role:
"""角色定义"""
id: str
name: str
permissions: List[Permission]
description: str = ""
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class Identity:
"""身份定义"""
id: str
name: str
identity_type: str # "user", "agent", "service"
roles: List[Role]
credentials: Dict[str, Any]
mfa_enabled: bool = False
created_at: datetime = field(default_factory=datetime.now)
last_login: Optional[datetime] = None
@dataclass
class AccessRequest:
"""访问请求"""
id: str
identity_id: str
resource: str
action: PermissionType
context: Dict[str, Any]
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class AccessPolicy:
"""访问策略"""
id: str
name: str
effect: AccessDecision # Allow or Deny
principals: List[str] # 主体列表
actions: List[PermissionType]
resources: List[str]
conditions: Dict[str, Any] # 条件
priority: int = 1
enabled: bool = True
class IdentityProvider:
"""
身份提供商
支持:
1. 身份创建与管理
2. 多因素认证
3. 凭证管理
4. 会话管理
"""
def __init__(self):
self.identities: Dict[str, Identity] = {}
self.sessions: Dict[str, Dict[str, Any]] = {}
self.mfa_providers = ["totp", "sms", "email", "hardware_key"]
def create_identity(self, name: str, identity_type: str,
mfa_enabled: bool = False) -> Identity:
"""创建身份"""
identity_id = f"identity_{secrets.token_hex(8)}"
identity = Identity(
id=identity_id,
name=name,
identity_type=identity_type,
roles=[],
credentials={},
mfa_enabled=mfa_enabled
)
self.identities[identity_id] = identity
return identity
def authenticate(self, identity_id: str, credentials: Dict[str, Any],
mfa_code: Optional[str] = None) -> Tuple[bool, str]:
"""认证身份"""
if identity_id not in self.identities:
return False, "Identity not found"
identity = self.identities[identity_id]
# 验证凭证 (简化实现)
if not self._verify_credentials(identity, credentials):
return False, "Invalid credentials"
# MFA 验证
if identity.mfa_enabled and mfa_code:
if not self._verify_mfa(identity_id, mfa_code):
return False, "Invalid MFA code"
elif identity.mfa_enabled and not mfa_code:
return False, "MFA required"
# 创建会话
session_id = self._create_session(identity_id)
# 更新最后登录时间
identity.last_login = datetime.now()
return True, session_id
def _verify_credentials(self, identity: Identity,
credentials: Dict[str, Any]) -> bool:
"""验证凭证"""
# 简化实现:实际应使用安全哈希和盐
stored_creds = identity.credentials
return credentials.get("password") == stored_creds.get("password_hash")
def _verify_mfa(self, identity_id: str, mfa_code: str) -> bool:
"""验证 MFA"""
# 简化实现:实际应使用 TOTP 算法
return len(mfa_code) == 6 and mfa_code.isdigit()
def _create_session(self, identity_id: str) -> str:
"""创建会话"""
session_id = f"session_{secrets.token_hex(16)}"
self.sessions[session_id] = {
"identity_id": identity_id,
"created_at": datetime.now(),
"expires_at": datetime.now() + timedelta(hours=8),
"ip_address": "127.0.0.1",
"user_agent": "Agent/1.0"
}
return session_id
def validate_session(self, session_id: str) -> Tuple[bool, Optional[str]]:
"""验证会话"""
if session_id not in self.sessions:
return False, None
session = self.sessions[session_id]
if datetime.now() > session["expires_at"]:
del self.sessions[session_id]
return False, None
return True, session["identity_id"]
class PermissionManager:
"""
权限管理器
支持:
1. 权限定义
2. 角色管理
3. 权限分配
4. 权限审计
"""
def __init__(self):
self.permissions: Dict[str, Permission] = {}
self.roles: Dict[str, Role] = {}
self.permission_history: List[Dict[str, Any]] = []
def create_permission(self, name: str, permission_type: PermissionType,
resource: str, scope: str) -> Permission:
"""创建权限"""
perm_id = f"perm_{secrets.token_hex(8)}"
permission = Permission(
id=perm_id,
name=name,
permission_type=permission_type,
resource=resource,
scope=scope
)
self.permissions[perm_id] = permission
return permission
def create_role(self, name: str,
permission_ids: List[str]) -> Role:
"""创建角色"""
role_id = f"role_{secrets.token_hex(8)}"
permissions = [
self.permissions[pid]
for pid in permission_ids
if pid in self.permissions
]
role = Role(
id=role_id,
name=name,
permissions=permissions
)
self.roles[role_id] = role
return role
def assign_role_to_identity(self, identity: Identity,
role: Role):
"""分配角色给身份"""
identity.roles.append(role)
self.permission_history.append({
"timestamp": datetime.now().isoformat(),
"action": "role_assigned",
"identity_id": identity.id,
"role_id": role.id
})
def get_effective_permissions(self, identity: Identity) -> Set[str]:
"""获取身份的有效权限"""
effective_perms = set()
for role in identity.roles:
for perm in role.permissions:
effective_perms.add(f"{perm.permission_type.value}:{perm.resource}")
return effective_perms
def audit_permissions(self, identity_id: str) -> Dict[str, Any]:
"""审计权限"""
# 获取最近 30 天的权限历史
cutoff = datetime.now() - timedelta(days=30)
recent_history = [
h for h in self.permission_history
if datetime.fromisoformat(h["timestamp"]) > cutoff
and h.get("identity_id") == identity_id
]
return {
"identity_id": identity_id,
"total_changes": len(recent_history),
"recent_changes": recent_history[-10:],
"audit_timestamp": datetime.now().isoformat()
}
class AccessControlEngine:
"""
访问控制引擎
支持:
1. RBAC (角色基访问控制)
2. ABAC (属性基访问控制)
3. 策略评估
4. 动态访问控制
"""
def __init__(self):
self.policies: Dict[str, AccessPolicy] = {}
self.decision_log: List[Dict[str, Any]] = []
def create_policy(self, name: str, effect: AccessDecision,
principals: List[str], actions: List[PermissionType],
resources: List[str],
conditions: Dict[str, Any] = None) -> AccessPolicy:
"""创建访问策略"""
policy_id = f"policy_{secrets.token_hex(8)}"
policy = AccessPolicy(
id=policy_id,
name=name,
effect=effect,
principals=principals,
actions=actions,
resources=resources,
conditions=conditions or {},
priority=1
)
self.policies[policy_id] = policy
return policy
def evaluate_access(self, request: AccessRequest,
identity: Identity) -> Tuple[AccessDecision, str]:
"""评估访问请求"""
# 获取有效权限
effective_perms = self._get_permission_manager().get_effective_permissions(identity)
# 策略评估
applicable_policies = self._find_applicable_policies(request, identity)
if not applicable_policies:
# 默认拒绝
decision = AccessDecision.DENY
reason = "No applicable policies"
else:
# 按优先级排序
applicable_policies.sort(key=lambda p: p.priority, reverse=True)
# 评估第一条匹配策略
policy = applicable_policies[0]
decision = policy.effect
if decision == AccessDecision.CONDITIONAL:
# 评估条件
if self._evaluate_conditions(policy.conditions, request.context):
decision = AccessDecision.ALLOW
reason = "Conditions satisfied"
else:
decision = AccessDecision.DENY
reason = "Conditions not satisfied"
else:
reason = f"Policy: {policy.name}"
# 记录决策
self.decision_log.append({
"timestamp": datetime.now().isoformat(),
"request_id": request.id,
"identity_id": identity.id,
"decision": decision.value,
"reason": reason
})
return decision, reason
def _find_applicable_policies(self, request: AccessRequest,
identity: Identity) -> List[AccessPolicy]:
"""查找适用策略"""
applicable = []
for policy in self.policies.values():
if not policy.enabled:
continue
# 检查主体
if identity.id not in policy.principals:
continue
# 检查动作
if request.action not in policy.actions:
continue
# 检查资源
if not any(re.match(pattern, request.resource)
for pattern in policy.resources):
continue
applicable.append(policy)
return applicable
def _evaluate_conditions(self, conditions: Dict[str, Any],
context: Dict[str, Any]) -> bool:
"""评估条件"""
for key, expected_value in conditions.items():
if context.get(key) != expected_value:
return False
return True
def _get_permission_manager(self):
"""获取权限管理器 (简化)"""
# 实际实现应引用 PermissionManager 实例
return PermissionManager()
class SandboxIsolator:
"""
沙箱隔离器
支持:
1. 沙箱创建
2. 资源限制
3. 系统调用过滤
4. 网络隔离
"""
def __init__(self):
self.sandboxes: Dict[str, Dict[str, Any]] = {}
self.resource_limits = {
"cpu_percent": 50,
"memory_mb": 512,
"disk_mb": 1024,
"network_enabled": False,
"max_processes": 10
}
def create_sandbox(self, identity_id: str,
sandbox_type: str = "default") -> str:
"""创建沙箱"""
sandbox_id = f"sandbox_{secrets.token_hex(16)}"
self.sandboxes[sandbox_id] = {
"identity_id": identity_id,
"type": sandbox_type,
"created_at": datetime.now(),
"status": "active",
"resource_usage": {
"cpu_percent": 0,
"memory_mb": 0,
"disk_mb": 0,
"processes": 0
},
"limits": self.resource_limits.copy(),
"network_access": [],
"allowed_syscalls": ["read", "write", "open", "close"]
}
return sandbox_id
def enforce_limits(self, sandbox_id: str) -> Tuple[bool, List[str]]:
"""执行资源限制"""
if sandbox_id not in self.sandboxes:
return False, ["Sandbox not found"]
sandbox = self.sandboxes[sandbox_id]
violations = []
# 检查 CPU
if sandbox["resource_usage"]["cpu_percent"] > sandbox["limits"]["cpu_percent"]:
violations.append(f"CPU limit exceeded: {sandbox['resource_usage']['cpu_percent']}%")
# 检查内存
if sandbox["resource_usage"]["memory_mb"] > sandbox["limits"]["memory_mb"]:
violations.append(f"Memory limit exceeded: {sandbox['resource_usage']['memory_mb']}MB")
# 检查磁盘
if sandbox["resource_usage"]["disk_mb"] > sandbox["limits"]["disk_mb"]:
violations.append(f"Disk limit exceeded: {sandbox['resource_usage']['disk_mb']}MB")
# 检查进程数
if sandbox["resource_usage"]["processes"] > sandbox["limits"]["max_processes"]:
violations.append(f"Process limit exceeded: {sandbox['resource_usage']['processes']}")
if violations:
# 采取纠正措施 (简化实现)
sandbox["status"] = "throttled"
return False, violations
return True, []
def check_syscall(self, sandbox_id: str,
syscall: str) -> Tuple[bool, str]:
"""检查系统调用"""
if sandbox_id not in self.sandboxes:
return False, "Sandbox not found"
sandbox = self.sandboxes[sandbox_id]
if syscall not in sandbox["allowed_syscalls"]:
return False, f"System call '{syscall}' not allowed"
return True, "Allowed"
# 使用示例
if __name__ == "__main__":
print("=== Agent 权限最小化与访问控制 ===\n")
print("=== 创建身份提供商 ===")
# 创建身份提供商
idp = IdentityProvider()
# 创建身份
user1 = idp.create_identity("Alice", "user", mfa_enabled=True)
agent1 = idp.create_identity("DataAgent", "agent", mfa_enabled=False)
print(f"创建{len(idp.identities)}个身份")
print(f" - {user1.name} ({user1.identity_type}) - MFA: {user1.mfa_enabled}")
print(f" - {agent1.name} ({agent1.identity_type}) - MFA: {agent1.mfa_enabled}")
print(f"\n=== 创建权限管理器 ===")
# 创建权限管理器
perm_mgr = PermissionManager()
# 创建权限
read_data = perm_mgr.create_permission("Read Data", PermissionType.READ, "database", "table:*")
write_data = perm_mgr.create_permission("Write Data", PermissionType.WRITE, "database", "table:users")
execute_code = perm_mgr.create_permission("Execute Code", PermissionType.EXECUTE, "sandbox", "env:isolated")
print(f"创建{len(perm_mgr.permissions)}个权限")
# 创建角色
reader_role = perm_mgr.create_role("Data Reader", [read_data.id])
writer_role = perm_mgr.create_role("Data Writer", [read_data.id, write_data.id])
executor_role = perm_mgr.create_role("Code Executor", [execute_code.id])
print(f"创建{len(perm_mgr.roles)}个角色")
# 分配角色
perm_mgr.assign_role_to_identity(user1, reader_role)
perm_mgr.assign_role_to_identity(agent1, executor_role)
print(f"\n身份权限:")
print(f" {user1.name}: {perm_mgr.get_effective_permissions(user1)}")
print(f" {agent1.name}: {perm_mgr.get_effective_permissions(agent1)}")
print(f"\n=== 创建访问控制引擎 ===")
# 创建访问控制引擎
access_engine = AccessControlEngine()
# 创建策略
policy1 = access_engine.create_policy(
name="Allow Data Read",
effect=AccessDecision.ALLOW,
principals=[user1.id],
actions=[PermissionType.READ],
resources=["database.*"]
)
policy2 = access_engine.create_policy(
name="Allow Code Execution",
effect=AccessDecision.ALLOW,
principals=[agent1.id],
actions=[PermissionType.EXECUTE],
resources=["sandbox.*"],
conditions={"env": "isolated"}
)
policy3 = access_engine.create_policy(
name="Deny Admin Access",
effect=AccessDecision.DENY,
principals=["*"],
actions=[PermissionType.ADMIN],
resources=["*"]
)
print(f"创建{len(access_engine.policies)}个访问策略")
print(f"\n=== 测试访问控制 ===")
# 测试访问请求
request1 = AccessRequest(
id="req_1",
identity_id=user1.id,
resource="database.users",
action=PermissionType.READ,
context={}
)
decision1, reason1 = access_engine.evaluate_access(request1, user1)
print(f"请求 1: {user1.name} 读取 database.users")
print(f" 决策:{decision1.value}")
print(f" 原因:{reason1}")
request2 = AccessRequest(
id="req_2",
identity_id=agent1.id,
resource="sandbox.code",
action=PermissionType.EXECUTE,
context={"env": "isolated"}
)
decision2, reason2 = access_engine.evaluate_access(request2, agent1)
print(f"\n请求 2: {agent1.name} 执行 sandbox.code")
print(f" 决策:{decision2.value}")
print(f" 原因:{reason2}")
request3 = AccessRequest(
id="req_3",
identity_id=user1.id,
resource="admin.panel",
action=PermissionType.ADMIN,
context={}
)
decision3, reason3 = access_engine.evaluate_access(request3, user1)
print(f"\n请求 3: {user1.name} 访问 admin.panel")
print(f" 决策:{decision3.value}")
print(f" 原因:{reason3}")
print(f"\n=== 沙箱隔离 ===")
# 创建沙箱隔离器
sandbox = SandboxIsolator()
# 创建沙箱
sandbox_id = sandbox.create_sandbox(agent1.id, "code_execution")
print(f"创建沙箱:{sandbox_id}")
# 检查系统调用
syscalls = ["read", "write", "exec", "network"]
print(f"\n系统调用检查:")
for syscall in syscalls:
allowed, reason = sandbox.check_syscall(sandbox_id, syscall)
status = "✓" if allowed else "✗"
print(f" {status} {syscall}: {reason}")
# 执行资源限制
sandbox.sandboxes[sandbox_id]["resource_usage"]["cpu_percent"] = 45
sandbox.sandboxes[sandbox_id]["resource_usage"]["memory_mb"] = 480
enforced, violations = sandbox.enforce_limits(sandbox_id)
print(f"\n资源限制执行:")
print(f" 状态:{'通过' if enforced else '违规'}")
if violations:
for v in violations:
print(f" - {v}")
print(f"\n=== 权限审计 ===")
audit_result = perm_mgr.audit_permissions(user1.id)
print(f"权限审计 ({user1.name}):")
print(f" 总变更数:{audit_result['total_changes']}")
print(f" 最近变更:{len(audit_result['recent_changes'])} 条")
print(f"\n关键观察:")
print("1. 权限最小化:最小特权原则,按需分配权限")
print("2. 身份认证:多因素认证,凭证安全管理")
print("3. 访问控制:RBAC/ABAC,策略引擎评估")
print("4. 沙箱隔离:资源限制,系统调用过滤")
print("5. 最小权限 AI:权限 + 身份 + 控制 + 隔离 = 可信赖")
print("\n权限最小化的使命:让 AI 在最小权限边界内安全运行")