Agent 安全隔离与沙箱完整实现
import os
import sys
import subprocess
import tempfile
import shutil
import signal
import time
import json
import hashlib
import secrets
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import resource
import prctl
import seccomp
class IsolationLevel(Enum):
"""隔离级别"""
NONE = "none" # 无隔离
BASIC = "basic" # 基础隔离
STANDARD = "standard" # 标准隔离
STRICT = "strict" # 严格隔离
MAXIMUM = "maximum" # 最大隔离
class SecurityEvent(Enum):
"""安全事件类型"""
SYSCALL_BLOCKED = "syscall_blocked" # 系统调用被阻断
RESOURCE_LIMIT_EXCEEDED = "resource_limit_exceeded" # 资源超限
ESCAPE_ATTEMPT = "escape_attempt" # 逃逸尝试
PERMISSION_DENIED = "permission_denied" # 权限拒绝
SUSPICIOUS_BEHAVIOR = "suspicious_behavior" # 可疑行为
TIMEOUT_EXCEEDED = "timeout_exceeded" # 超时
@dataclass
class SecurityConfig:
"""安全配置"""
isolation_level: IsolationLevel
max_cpu_time: int # 秒
max_memory: int # MB
max_disk_space: int # MB
max_processes: int
max_file_size: int # MB
allowed_syscalls: List[str]
blocked_paths: List[str]
network_enabled: bool
timeout: int # 秒
@dataclass
class ExecutionResult:
"""执行结果"""
execution_id: str
success: bool
stdout: str
stderr: str
exit_code: int
execution_time: float
memory_used: int
security_events: List[Dict[str, Any]]
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class SecurityReport:
"""安全报告"""
report_id: str
execution_id: str
risk_level: str # low/medium/high/critical
security_events: List[Dict[str, Any]]
recommendations: List[str]
generated_at: datetime = field(default_factory=datetime.now)
class SecureSandbox:
"""
安全沙箱系统
支持:
1. 多层隔离
2. 资源限制
3. 系统调用过滤
4. 行为监控
"""
def __init__(self, config: SecurityConfig):
self.config = config
self.security_events = []
self.execution_count = 0
def _setup_resource_limits(self):
"""设置资源限制"""
# CPU 时间限制
resource.setrlimit(resource.RLIMIT_CPU,
(self.config.max_cpu_time, self.config.max_cpu_time))
# 内存限制
memory_bytes = self.config.max_memory * 1024 * 1024
resource.setrlimit(resource.RLIMIT_AS,
(memory_bytes, memory_bytes))
# 进程数限制
resource.setrlimit(resource.RLIMIT_NPROC,
(self.config.max_processes, self.config.max_processes))
# 文件大小限制
file_size_bytes = self.config.max_file_size * 1024 * 1024
resource.setrlimit(resource.RLIMIT_FSIZE,
(file_size_bytes, file_size_bytes))
def _setup_seccomp_filter(self):
"""设置 seccomp 系统调用过滤"""
if self.config.isolation_level in [IsolationLevel.STRICT, IsolationLevel.MAXIMUM]:
# 创建 seccomp 过滤器
filter_ctx = seccomp.Seccomp(seccomp.ERRNO(1))
# 只允许安全的系统调用
allowed_syscalls = [
'read', 'write', 'close', 'fstat', 'stat', 'lstat',
'mmap', 'mprotect', 'munmap', 'brk', 'rt_sigaction',
'rt_sigprocmask', 'access', 'pipe', 'dup', 'dup2',
'getpid', 'getuid', 'getgid', 'geteuid', 'getegid',
'getcwd', 'chdir', 'open', 'openat', 'execve',
'exit', 'exit_group', 'wait4', 'nanosleep',
'clock_gettime', 'getrandom', 'arch_prctl'
]
for syscall in allowed_syscalls:
try:
syscall_num = seccomp.syscall_name_to_nr(syscall)
filter_ctx.add_rule(seccomp.ALLOW, syscall_num)
except:
pass
# 加载过滤器
filter_ctx.load()
def _setup_environment(self, work_dir: str) -> Dict[str, str]:
"""设置安全的执行环境"""
# 最小化环境变量
safe_env = {
'PATH': '/usr/bin:/bin',
'HOME': work_dir,
'TMPDIR': work_dir,
'PYTHONUNBUFFERED': '1',
'PYTHONDONTWRITEBYTECODE': '1'
}
# 移除危险的环境变量
dangerous_vars = ['LD_PRELOAD', 'LD_LIBRARY_PATH', 'PYTHONPATH',
'PYTHONINSPECT', 'PYTHONSTARTUP']
for var in dangerous_vars:
if var in os.environ:
safe_env[var] = ''
return safe_env
def _monitor_execution(self, process: subprocess.Popen) -> List[Dict[str, Any]]:
"""监控执行过程"""
events = []
start_time = time.time()
while process.poll() is None:
elapsed = time.time() - start_time
# 检查超时
if elapsed > self.config.timeout:
process.kill()
events.append({
'type': SecurityEvent.TIMEOUT_EXCEEDED.value,
'timestamp': datetime.now().isoformat(),
'details': f'Execution exceeded timeout of {self.config.timeout}s'
})
break
# 检查资源使用
try:
mem_info = process.memory_info()
if mem_info.rss > self.config.max_memory * 1024 * 1024:
events.append({
'type': SecurityEvent.RESOURCE_LIMIT_EXCEEDED.value,
'timestamp': datetime.now().isoformat(),
'details': f'Memory usage exceeded limit: {mem_info.rss / 1024 / 1024:.1f}MB'
})
except:
pass
time.sleep(0.1)
return events
def execute_code(self, code: str, language: str = 'python') -> ExecutionResult:
"""执行代码"""
execution_id = f"exec_{secrets.token_hex(16)}"
self.execution_count += 1
# 创建临时工作目录
work_dir = tempfile.mkdtemp(prefix=f"sandbox_{execution_id}_")
try:
# 设置资源限制
self._setup_resource_limits()
# 设置 seccomp 过滤器
if self.config.isolation_level in [IsolationLevel.STRICT, IsolationLevel.MAXIMUM]:
self._setup_seccomp_filter()
# 设置安全环境
safe_env = self._setup_environment(work_dir)
# 创建代码文件
if language == 'python':
code_file = os.path.join(work_dir, 'script.py')
with open(code_file, 'w') as f:
f.write(code)
command = ['python3', '-u', code_file]
else:
raise ValueError(f"Unsupported language: {language}")
# 执行代码
start_time = time.time()
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=work_dir,
env=safe_env,
preexec_fn=os.setsid # 创建新进程组
)
# 监控执行
security_events = self._monitor_execution(process)
# 获取输出
try:
stdout, stderr = process.communicate(timeout=self.config.timeout)
stdout_str = stdout.decode('utf-8', errors='replace')
stderr_str = stderr.decode('utf-8', errors='replace')
except subprocess.TimeoutExpired:
os.killpg(os.getpgid(process.pid), signal.SIGKILL)
stdout_str = ""
stderr_str = "Execution timed out"
security_events.append({
'type': SecurityEvent.TIMEOUT_EXCEEDED.value,
'timestamp': datetime.now().isoformat()
})
execution_time = time.time() - start_time
# 获取资源使用情况
try:
memory_used = process.memory_info().rss // (1024 * 1024)
except:
memory_used = 0
result = ExecutionResult(
execution_id=execution_id,
success=process.returncode == 0,
stdout=stdout_str,
stderr=stderr_str,
exit_code=process.returncode or 0,
execution_time=execution_time,
memory_used=memory_used,
security_events=security_events
)
self.security_events.extend(security_events)
return result
except Exception as e:
return ExecutionResult(
execution_id=execution_id,
success=False,
stdout="",
stderr=f"Execution failed: {str(e)}",
exit_code=-1,
execution_time=0,
memory_used=0,
security_events=[{
'type': 'execution_error',
'timestamp': datetime.now().isoformat(),
'details': str(e)
}]
)
finally:
# 清理工作目录
try:
shutil.rmtree(work_dir)
except:
pass
def generate_security_report(self, execution_result: ExecutionResult) -> SecurityReport:
"""生成安全报告"""
report_id = f"report_{secrets.token_hex(16)}"
# 评估风险等级
if not execution_result.security_events:
risk_level = "low"
elif len(execution_result.security_events) <= 2:
risk_level = "medium"
elif len(execution_result.security_events) <= 5:
risk_level = "high"
else:
risk_level = "critical"
# 生成建议
recommendations = []
event_types = [e['type'] for e in execution_result.security_events]
if SecurityEvent.SYSCALL_BLOCKED.value in event_types:
recommendations.append("代码尝试执行被禁止的系统调用,请检查代码逻辑")
if SecurityEvent.RESOURCE_LIMIT_EXCEEDED.value in event_types:
recommendations.append("代码资源使用超限,请优化资源使用")
if SecurityEvent.ESCAPE_ATTEMPT.value in event_types:
recommendations.append("检测到逃逸尝试,代码可能存在恶意行为")
if SecurityEvent.TIMEOUT_EXCEEDED.value in event_types:
recommendations.append("代码执行超时,请优化执行效率")
if not recommendations:
recommendations.append("代码执行安全,未发现异常行为")
report = SecurityReport(
report_id=report_id,
execution_id=execution_result.execution_id,
risk_level=risk_level,
security_events=execution_result.security_events,
recommendations=recommendations
)
return report
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"total_executions": self.execution_count,
"total_security_events": len(self.security_events),
"event_breakdown": self._count_events_by_type(),
"timestamp": datetime.now().isoformat()
}
def _count_events_by_type(self) -> Dict[str, int]:
"""按类型统计事件"""
counts = {}
for event in self.security_events:
event_type = event.get('type', 'unknown')
counts[event_type] = counts.get(event_type, 0) + 1
return counts
# 使用示例
if __name__ == "__main__":
print("=== Agent 安全隔离、沙箱与逃逸防护 ===\n")
print("=== 创建安全沙箱 ===")
# 安全配置
config = SecurityConfig(
isolation_level=IsolationLevel.STRICT,
max_cpu_time=10,
max_memory=256,
max_disk_space=100,
max_processes=5,
max_file_size=10,
allowed_syscalls=['read', 'write', 'open', 'close'],
blocked_paths=['/etc', '/root', '/proc'],
network_enabled=False,
timeout=30
)
sandbox = SecureSandbox(config)
print(f"隔离级别:{config.isolation_level.value}")
print(f"CPU 限制:{config.max_cpu_time}s")
print(f"内存限制:{config.max_memory}MB")
print(f"超时限制:{config.timeout}s\n")
# 测试用例
test_cases = [
("print('Hello, World!')", "正常代码", True),
("import os; os.system('ls')", "系统调用", False),
("x = 'a' * 1000000000", "内存溢出", False),
("while True: pass", "无限循环", False),
("import socket; s = socket.socket()", "网络访问", False),
]
print("=== 测试安全沙箱 ===\n")
for code, description, should_succeed in test_cases:
print(f"测试:{description}")
print(f"代码:{code[:60]}...")
# 执行代码
result = sandbox.execute_code(code)
print(f" 执行 ID: {result.execution_id}")
print(f" 成功:{result.success}")
print(f" 退出码:{result.exit_code}")
print(f" 执行时间:{result.execution_time:.2f}s")
print(f" 内存使用:{result.memory_used}MB")
if result.stdout:
print(f" 输出:{result.stdout[:100]}")
if result.stderr:
print(f" 错误:{result.stderr[:100]}")
if result.security_events:
print(f" 安全事件:{len(result.security_events)}")
for event in result.security_events:
print(f" - {event['type']}: {event.get('details', '')}")
# 生成安全报告
report = sandbox.generate_security_report(result)
print(f" 风险等级:{report.risk_level}")
print(f" 建议:{report.recommendations[0]}")
print()
print("=== 安全统计 ===")
stats = sandbox.get_statistics()
print(f"总执行次数:{stats['total_executions']}")
print(f"总安全事件:{stats['total_security_events']}")
print(f"事件分布:{stats['event_breakdown']}")
print(f"\n关键观察:")
print("1. 安全隔离:进程隔离、资源限制、权限控制")
print("2. 沙箱技术:容器、微 VM、Wasm、代码执行")
print("3. 逃逸防护:系统调用过滤、行为监控")
print("4. 运行时安全:实时阻断、异常检测")
print("5. 隔离安全:隔离 + 沙箱 + 防护 + 监控 = 可信赖")
print("\n隔离安全的使命:让 AI 代码在隔离环境中执行、在沙箱中运行、在防护下安全、在监控下可靠")