C.1 使用 Northflank SDK 创建沙箱
from northflank import Client
client = Client(api_key="nf_xxx")
# 创建沙箱项目
project = client.create_project(
name="agent-sandbox",
region="us-east-1"
)
# 部署沙箱服务
service = project.create_service(
name="agent-executor",
image="python:3.11-slim",
resources={
"cpu": "1",
"memory": "2Gi"
},
isolation="kata-containers"
)
# 执行代码
result = service.execute(
code="print('Hello from sandbox!')",
timeout=30
)
print(result.output)
C.2 使用 E2B SDK
from e2b import Sandbox
# 创建沙箱
sandbox = Sandbox(template="python3")
# 执行代码
execution = sandbox.run_code(
"import numpy as np; print(np.random.rand())"
)
print(execution.text)
# 上传文件
sandbox.files.write("/data/input.txt", "test data")
# 下载文件
content = sandbox.files.read("/data/output.txt")
# 清理
sandbox.close()
C.3 使用 Arrakis Python SDK
from py_arrakis import SandboxManager
# 连接 Arrakis 服务器
manager = SandboxManager("http://localhost:7000")
# 启动沙箱
with manager.start_sandbox("agent-sandbox") as sb:
# 执行命令
result = sb.run_cmd("echo hello world")
print(result["output"])
# 创建快照
snapshot_id = sb.snapshot("checkpoint-1")
# 执行更多操作
sb.run_cmd("echo 'after snapshot' >> /tmp/test.txt")
# 恢复到快照
# (在另一个沙箱实例中)
restored = manager.restore("agent-sandbox", snapshot_id)
# 沙箱自动销毁
C.4 自定义 Scheduler 实现
class AgentScheduler:
def __init__(self, state_store, sandbox_pool):
self.state_store = state_store
self.sandbox_pool = sandbox_pool
self.task_queue = asyncio.Queue()
async def submit_task(self, task: AgentTask):
"""提交任务到调度队列"""
await self.task_queue.put(task)
await self.state_store.save_task_state(
task.id,
TaskState.PENDING
)
async def run_scheduler(self):
"""调度器主循环"""
while True:
task = await self.task_queue.get()
# 选择沙箱实例
sandbox = await self.select_sandbox(task)
# 更新状态
await self.state_store.save_task_state(
task.id,
TaskState.RUNNING,
sandbox_id=sandbox.id
)
# 异步执行
asyncio.create_task(
self.execute_task(task, sandbox)
)
async def execute_task(self, task, sandbox):
"""执行任务"""
try:
result = await sandbox.execute(
task.code,
timeout=task.timeout
)
await self.state_store.save_task_state(
task.id,
TaskState.SUCCESS,
result=result
)
except TimeoutError:
await self.state_store.save_task_state(
task.id,
TaskState.TIMEOUT
)
except Exception as e:
await self.state_store.save_task_state(
task.id,
TaskState.FAILED,
error=str(e)
)