文档版本: v1.0
创建日期: 2026-03-19
文档状态: 技术评审版
保密级别: 内部机密
本项目旨在构建一个分布式代码执行集群系统,用于统一管理和调度10台代码服务器(每台运行Claude Code),支持外部千人规模的开发者共创协作,同时确保代码资产不泄露。
| 目标维度 | 具体指标 |
|---|---|
| 调度能力 | 统一管理10台代码服务器,支持动态扩缩容至100+节点 |
| 并发支持 | 支持1000+外部开发者同时在线协作 |
| 安全等级 | 代码零泄露,通过ISO27001安全认证标准 |
| 响应延迟 | 任务分发延迟 < 100ms,结果回收延迟 < 500ms |
| 可用性 | 系统可用性 ≥ 99.9%,支持故障自动转移 |
┌─────────────────────────────────────────────────────────────┐
│ 技术栈全景图 │
├─────────────┬─────────────┬─────────────┬───────────────────┤
│ 后端框架 │ 任务队列 │ 数据库 │ 容器平台 │
├─────────────┼─────────────┼─────────────┼───────────────────┤
│ FastAPI │ Redis │ PostgreSQL │ Docker + K8s │
│ Python 3.9+│ Cluster │ 14+ │ │
├─────────────┴─────────────┴─────────────┴───────────────────┤
│ 云平台:阿里云/腾讯云/AWS │ AI: Anthropic Claude API │
└─────────────────────────────────────────────────────────────┘
| 层次 | 职责 | 核心组件 | SLA要求 |
|---|---|---|---|
| 外部协作面 | 用户交互、API接入、身份认证 | Web Portal, API Gateway, Auth Center | 可用性99.95% |
| 控制面 | 任务调度、状态监控、配置管理 | Controller, Redis, Monitor | 可用性99.99% |
| 弹性执行面 | 代码执行、AI调用、沙箱隔离 | Worker, Claude Code, Sandbox | 可用性99.9% |
| 数据持久层 | 数据存储、队列缓存、日志归档 | PostgreSQL, Redis, OSS, ES | 数据持久性99.999% |
职责: 负责任务的接收、解析、优先级排序和分发
接口定义:
class SchedulerController:
async def submit_task(self, task: TaskCreateRequest) -> TaskResponse
async def cancel_task(self, task_id: str) -> TaskResponse
async def get_task_status(self, task_id: str) -> TaskStatus
async def dispatch_task(self, task: Task) -> Optional[WorkerNode]
async def rebalance_tasks(self) -> None
依赖:
配置参数:
scheduler:
max_concurrent_tasks: 1000
task_timeout_seconds: 300
retry_max_attempts: 3
priority_levels: [critical, high, normal, low]
dispatch_strategy: round_robin # 或 least_loaded, priority_based
职责: 管理Worker节点的生命周期、健康检查和负载均衡
接口定义:
class WorkerManager:
async def register_worker(self, worker_info: WorkerInfo) -> str
async def unregister_worker(self, worker_id: str) -> bool
async def get_available_workers(self) -> List[WorkerNode]
async def get_worker_status(self, worker_id: str) -> WorkerStatus
async def heartbeat(self, worker_id: str) -> bool
async def scale_workers(self, target_count: int) -> ScalingResult
依赖:
职责: 执行具体任务,调用Claude Code,管理沙箱环境
接口定义:
class WorkerExecutor:
async def execute_task(self, task: Task) -> TaskResult
async def setup_sandbox(self, task_id: str) -> SandboxEnv
async def cleanup_sandbox(self, sandbox_id: str) -> bool
async def invoke_claude(self, prompt: str, context: CodeContext) -> ClaudeResponse
async def report_result(self, task_id: str, result: TaskResult) -> bool
依赖:
职责: 创建和管理一次性代码执行沙箱,确保隔离安全
接口定义:
class SandboxManager:
async def create_sandbox(self, config: SandboxConfig) -> Sandbox
async def destroy_sandbox(self, sandbox_id: str) -> bool
async def get_sandbox_status(self, sandbox_id: str) -> SandboxStatus
async def inject_code(self, sandbox_id: str, code: str) -> bool
async def execute_command(self, sandbox_id: str, cmd: str) -> CommandResult
安全特性:
职责: 用户身份认证、Token管理、会话控制
接口定义:
class AuthModule:
async def login(self, credentials: LoginRequest) -> TokenResponse
async def logout(self, token: str) -> bool
async def refresh_token(self, refresh_token: str) -> TokenResponse
async def verify_token(self, token: str) -> TokenPayload
async def register_user(self, user_info: UserInfo) -> UserResponse
安全机制:
# 任务优先级值对象
class TaskPriority:
CRITICAL = "critical" # 紧急任务,立即执行
HIGH = "high" # 高优先级,优先执行
NORMAL = "normal" # 普通优先级,正常队列
LOW = "low" # 低优先级,空闲时执行
# 任务状态值对象
class TaskStatus:
PENDING = "pending" # 等待调度
QUEUED = "queued" # 已入队
DISPATCHED = "dispatched" # 已分发
RUNNING = "running" # 执行中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 执行失败
CANCELLED = "cancelled" # 已取消
TIMEOUT = "timeout" # 超时
# Worker状态值对象
class WorkerStatus:
IDLE = "idle" # 空闲
BUSY = "busy" # 忙碌
UNHEALTHY = "unhealthy" # 不健康
DRAINING = "draining" # 排空中
OFFLINE = "offline" # 离线
# 沙箱状态值对象
class SandboxStatus:
CREATING = "creating" # 创建中
READY = "ready" # 就绪
RUNNING = "running" # 运行中
DESTROYING = "destroying" # 销毁中
DESTROYED = "destroyed" # 已销毁
# 任务聚合根
class TaskAggregate:
def __init__(self, task_id: str, project_id: str, user_id: str):
self.task_id = task_id
self.project_id = project_id
self.user_id = user_id
self.status = TaskStatus.PENDING
self.priority = TaskPriority.NORMAL
self.input_data = None
self.result = None
self.audit_logs = []
self.created_at = datetime.utcnow()
def submit(self, input_data: dict) -> None:
"""提交任务"""
self.input_data = input_data
self.status = TaskStatus.QUEUED
def dispatch(self, worker_id: str) -> None:
"""分发任务"""
self.worker_id = worker_id
self.status = TaskStatus.DISPATCHED
self.started_at = datetime.utcnow()
def complete(self, result: TaskResult) -> None:
"""完成任务"""
self.result = result
self.status = TaskStatus.COMPLETED
self.completed_at = datetime.utcnow()
def fail(self, error: str) -> None:
"""任务失败"""
self.result = TaskResult(error_message=error, exit_code=-1)
self.status = TaskStatus.FAILED
def cancel(self) -> None:
"""取消任务"""
self.status = TaskStatus.CANCELLED
def add_audit_log(self, action: str, details: dict) -> None:
"""添加审计日志"""
self.audit_logs.append({
"action": action,
"details": details,
"timestamp": datetime.utcnow()
})
# Worker聚合根
class WorkerAggregate:
def __init__(self, worker_id: str, node_name: str):
self.worker_id = worker_id
self.node_name = node_name
self.status = WorkerStatus.IDLE
self.current_tasks = []
self.capacity = 10
self.last_heartbeat = datetime.utcnow()
self.metrics = WorkerMetrics()
def assign_task(self, task_id: str) -> bool:
"""分配任务"""
if len(self.current_tasks) >= self.capacity:
return False
self.current_tasks.append(task_id)
self.status = WorkerStatus.BUSY
return True
def complete_task(self, task_id: str) -> None:
"""完成任务"""
if task_id in self.current_tasks:
self.current_tasks.remove(task_id)
if len(self.current_tasks) == 0:
self.status = WorkerStatus.IDLE
def heartbeat(self) -> None:
"""心跳上报"""
self.last_heartbeat = datetime.utcnow()
def is_healthy(self) -> bool:
"""健康检查"""
return (
self.status != WorkerStatus.OFFLINE and
(datetime.utcnow() - self.last_heartbeat).seconds < 30
)
# 领域事件基类
class DomainEvent:
def __init__(self, event_id: str, aggregate_id: str, timestamp: datetime):
self.event_id = event_id
self.aggregate_id = aggregate_id
self.timestamp = timestamp
# 任务相关事件
class TaskSubmittedEvent(DomainEvent):
def __init__(self, task_id: str, user_id: str, project_id: str):
super().__init__(uuid4(), task_id, datetime.utcnow())
self.user_id = user_id
self.project_id = project_id
class TaskDispatchedEvent(DomainEvent):
def __init__(self, task_id: str, worker_id: str):
super().__init__(uuid4(), task_id, datetime.utcnow())
self.worker_id = worker_id
class TaskCompletedEvent(DomainEvent):
def __init__(self, task_id: str, result: TaskResult):
super().__init__(uuid4(), task_id, datetime.utcnow())
self.result = result
class TaskFailedEvent(DomainEvent):
def __init__(self, task_id: str, error: str):
super().__init__(uuid4(), task_id, datetime.utcnow())
self.error = error
# Worker相关事件
class WorkerRegisteredEvent(DomainEvent):
def __init__(self, worker_id: str, node_info: dict):
super().__init__(uuid4(), worker_id, datetime.utcnow())
self.node_info = node_info
class WorkerHeartbeatEvent(DomainEvent):
def __init__(self, worker_id: str, metrics: dict):
super().__init__(uuid4(), worker_id, datetime.utcnow())
self.metrics = metrics
class WorkerOfflineEvent(DomainEvent):
def __init__(self, worker_id: str, reason: str):
super().__init__(uuid4(), worker_id, datetime.utcnow())
self.reason = reason
# 安全相关事件
class CodeAccessEvent(DomainEvent):
def __init__(self, user_id: str, code_id: str, action: str):
super().__init__(uuid4(), code_id, datetime.utcnow())
self.user_id = user_id
self.action = action
class SecurityAlertEvent(DomainEvent):
def __init__(self, alert_type: str, severity: str, details: dict):
super().__init__(uuid4(), "security", datetime.utcnow())
self.alert_type = alert_type
self.severity = severity
self.details = details
-- 用户表
CREATE TABLE users (
user_id VARCHAR(36) PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
api_key VARCHAR(64) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email),
INDEX idx_status (status)
);
-- 项目表
CREATE TABLE projects (
project_id VARCHAR(36) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
owner_id VARCHAR(36) NOT NULL,
visibility VARCHAR(20) DEFAULT 'private',
status VARCHAR(20) DEFAULT 'active',
settings JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (owner_id) REFERENCES users(user_id),
INDEX idx_owner (owner_id),
INDEX idx_status (status),
INDEX idx_visibility (visibility)
);
-- 任务表
CREATE TABLE tasks (
task_id VARCHAR(36) PRIMARY KEY,
project_id VARCHAR(36) NOT NULL,
user_id VARCHAR(36) NOT NULL,
worker_id VARCHAR(36),
task_type VARCHAR(50) NOT NULL,
priority VARCHAR(20) DEFAULT 'normal',
status VARCHAR(20) DEFAULT 'pending',
input_data TEXT,
result_data TEXT,
retry_count INT DEFAULT 0,
timeout_seconds INT DEFAULT 300,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP,
expires_at TIMESTAMP,
FOREIGN KEY (project_id) REFERENCES projects(project_id),
FOREIGN KEY (user_id) REFERENCES users(user_id),
FOREIGN KEY (worker_id) REFERENCES workers(worker_id),
INDEX idx_status (status),
INDEX idx_priority (priority),
INDEX idx_created (created_at),
INDEX idx_project (project_id),
INDEX idx_user (user_id),
INDEX idx_worker (worker_id)
);
-- Worker表
CREATE TABLE workers (
worker_id VARCHAR(36) PRIMARY KEY,
node_name VARCHAR(255) NOT NULL,
pod_name VARCHAR(255),
ip_address VARCHAR(45) NOT NULL,
port INT NOT NULL,
status VARCHAR(20) DEFAULT 'idle',
capacity INT DEFAULT 10,
current_load INT DEFAULT 0,
metadata JSONB DEFAULT '{}',
last_heartbeat TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_status (status),
INDEX idx_heartbeat (last_heartbeat),
INDEX idx_node (node_name)
);
-- 审计日志表
CREATE TABLE audit_logs (
log_id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36),
task_id VARCHAR(36),
action VARCHAR(50) NOT NULL,
resource_type VARCHAR(50),
resource_id VARCHAR(36),
ip_address VARCHAR(45),
user_agent TEXT,
request_data JSONB,
response_data JSONB,
response_code INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id),
FOREIGN KEY (task_id) REFERENCES tasks(task_id),
INDEX idx_user (user_id),
INDEX idx_task (task_id),
INDEX idx_action (action),
INDEX idx_created (created_at),
INDEX idx_resource (resource_type, resource_id)
);
-- 沙箱表
CREATE TABLE sandboxes (
sandbox_id VARCHAR(36) PRIMARY KEY,
task_id VARCHAR(36) NOT NULL,
worker_id VARCHAR(36) NOT NULL,
container_id VARCHAR(255),
status VARCHAR(20) DEFAULT 'creating',
image VARCHAR(255) NOT NULL,
resources JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
destroyed_at TIMESTAMP,
FOREIGN KEY (task_id) REFERENCES tasks(task_id),
FOREIGN KEY (worker_id) REFERENCES workers(worker_id),
INDEX idx_task (task_id),
INDEX idx_status (status),
INDEX idx_worker (worker_id)
);
# 任务队列
task_queue:priority:critical -> List [task_json, ...]
task_queue:priority:high -> List [task_json, ...]
task_queue:priority:normal -> List [task_json, ...]
task_queue:priority:low -> List [task_json, ...]
# Worker队列
worker:{worker_id}:queue -> List [task_json, ...]
# Worker状态
worker:{worker_id}:info -> Hash {node_name, ip, port, capacity, ...}
worker:{worker_id}:status -> String {idle|busy|unhealthy|offline}
worker:{worker_id}:heartbeat -> String {timestamp}
worker:{worker_id}:metrics -> Hash {cpu, memory, disk, active_tasks}
# 任务状态
task:{task_id}:status -> String {pending|queued|dispatched|running|completed|failed}
task:{task_id}:data -> Hash {input_data, result_data, ...}
# 结果队列
result_queue -> List [result_json, ...]
# 会话管理
session:{user_id} -> String {session_data} EX 7200
token:{jti} -> String {token_payload} EX 7200
refresh:{jti} -> String {user_id} EX 604800
# 分布式锁
lock:task:{task_id} -> String {worker_id} EX 300
lock:worker:{worker_id} -> String {controller_id} EX 30
# 统计指标
stats:tasks:total -> Counter
stats:tasks:completed:today -> Counter
stats:workers:active -> Gauge
stats:queue:size -> Gauge
基础URL: https://api.codecluster.com/v1
认证方式: Bearer Token (JWT)
请求格式: application/json
响应格式: application/json
字符编码: UTF-8
{
"code": 200,
"message": "success",
"data": {},
"timestamp": 1710806400,
"request_id": "req_abc123xyz"
}
| 错误码 | 说明 |
|---|---|
| 200 | 成功 |
| 400 | 请求参数错误 |
| 401 | 未认证/Token无效 |
| 403 | 权限不足 |
| 404 | 资源不存在 |
| 409 | 资源冲突 |
| 429 | 请求频率超限 |
| 500 | 服务器内部错误 |
| 503 | 服务不可用 |
# 用户登录
POST /api/v1/auth/login
Request:
{
"username": "string",
"password": "string",
"mfa_code": "string (optional)"
}
Response:
{
"access_token": "string",
"refresh_token": "string",
"expires_in": 7200,
"token_type": "Bearer"
}
# Token刷新
POST /api/v1/auth/refresh
Request:
{
"refresh_token": "string"
}
Response:
{
"access_token": "string",
"expires_in": 7200
}
# 用户登出
POST /api/v1/auth/logout
Headers:
Authorization: Bearer {token}
# 用户注册
POST /api/v1/auth/register
Request:
{
"username": "string",
"email": "string",
"password": "string"
}
# 创建任务
POST /api/v1/tasks
Headers:
Authorization: Bearer {token}
Request:
{
"project_id": "string",
"task_type": "code_gen|code_review|refactor|test",
"priority": "critical|high|normal|low",
"input_data": {
"code": "string",
"prompt": "string",
"context": "object"
},
"timeout_seconds": 300
}
Response:
{
"task_id": "string",
"status": "queued",
"estimated_wait_seconds": 30
}
# 获取任务状态
GET /api/v1/tasks/{task_id}
Headers:
Authorization: Bearer {token}
Response:
{
"task_id": "string",
"status": "running",
"progress": 50,
"worker_id": "string",
"created_at": "2026-03-19T10:00:00Z",
"started_at": "2026-03-19T10:00:30Z"
}
# 获取任务结果
GET /api/v1/tasks/{task_id}/result
Headers:
Authorization: Bearer {token}
Response:
{
"task_id": "string",
"status": "completed",
"output": "string",
"artifact_url": "string",
"metrics": {
"execution_time_ms": 1500,
"tokens_used": 2000
}
}
# 取消任务
POST /api/v1/tasks/{task_id}/cancel
Headers:
Authorization: Bearer {token}
Response:
{
"task_id": "string",
"status": "cancelled"
}
# 列出任务
GET /api/v1/tasks
Headers:
Authorization: Bearer {token}
Query Params:
project_id: string (optional)
status: string (optional)
priority: string (optional)
page: integer (default: 1)
page_size: integer (default: 20)
Response:
{
"tasks": [...],
"total": 100,
"page": 1,
"page_size": 20
}
# Worker注册
POST /api/v1/workers/register
Request:
{
"node_name": "string",
"ip_address": "string",
"port": 8080,
"capacity": 10,
"metadata": {
"cpu_cores": 8,
"memory_gb": 32,
"gpu_type": "none"
}
}
Response:
{
"worker_id": "string",
"status": "idle"
}
# Worker心跳
POST /api/v1/workers/{worker_id}/heartbeat
Request:
{
"cpu_usage": 45.5,
"memory_usage": 60.2,
"disk_usage": 30.0,
"active_tasks": 3
}
Response:
{
"ack": true
}
# Worker下线
POST /api/v1/workers/{worker_id}/unregister
Response:
{
"ack": true
}
# 获取Worker状态
GET /api/v1/workers/{worker_id}
Headers:
Authorization: Bearer {token}
Response:
{
"worker_id": "string",
"node_name": "string",
"status": "busy",
"current_load": 5,
"capacity": 10,
"last_heartbeat": "2026-03-19T10:00:00Z"
}
# 列出Worker
GET /api/v1/workers
Headers:
Authorization: Bearer {token}
Query Params:
status: string (optional)
Response:
{
"workers": [...],
"total": 10,
"active": 8,
"idle": 5,
"busy": 3
}
# 创建项目
POST /api/v1/projects
Headers:
Authorization: Bearer {token}
Request:
{
"name": "string",
"description": "string",
"visibility": "private|public|internal"
}
Response:
{
"project_id": "string",
"name": "string"
}
# 获取项目详情
GET /api/v1/projects/{project_id}
Headers:
Authorization: Bearer {token}
Response:
{
"project_id": "string",
"name": "string",
"description": "string",
"owner_id": "string",
"visibility": "private",
"created_at": "2026-03-19T10:00:00Z",
"stats": {
"total_tasks": 100,
"completed_tasks": 95,
"collaborators": 5
}
}
# 更新项目
PUT /api/v1/projects/{project_id}
Headers:
Authorization: Bearer {token}
Request:
{
"name": "string (optional)",
"description": "string (optional)",
"visibility": "string (optional)"
}
# 删除项目
DELETE /api/v1/projects/{project_id}
Headers:
Authorization: Bearer {token}
# 列出项目
GET /api/v1/projects
Headers:
Authorization: Bearer {token}
Query Params:
visibility: string (optional)
page: integer
page_size: integer
# 查询审计日志
GET /api/v1/audit/logs
Headers:
Authorization: Bearer {token}
Query Params:
user_id: string (optional)
task_id: string (optional)
action: string (optional)
start_time: string (optional)
end_time: string (optional)
page: integer
page_size: integer
Response:
{
"logs": [...],
"total": 1000,
"page": 1,
"page_size": 20
}
# 导出审计日志
POST /api/v1/audit/export
Headers:
Authorization: Bearer {token}
Request:
{
"start_time": "2026-03-01T00:00:00Z",
"end_time": "2026-03-19T23:59:59Z",
"format": "csv|json"
}
Response:
{
"export_id": "string",
"download_url": "string",
"expires_at": "2026-03-20T00:00:00Z"
}
# 获取系统指标
GET /api/v1/metrics/system
Headers:
Authorization: Bearer {token}
Response:
{
"tasks": {
"total": 10000,
"pending": 50,
"running": 30,
"completed_today": 500,
"failed_today": 5
},
"workers": {
"total": 10,
"active": 8,
"idle": 5,
"busy": 3,
"unhealthy": 0
},
"queue": {
"size": 50,
"avg_wait_time_ms": 100
}
}
# 获取Worker指标
GET /api/v1/metrics/workers/{worker_id}
Headers:
Authorization: Bearer {token}
Query Params:
start_time: string
end_time: string
interval: 1m|5m|1h|1d
Response:
{
"worker_id": "string",
"metrics": [
{
"timestamp": "2026-03-19T10:00:00Z",
"cpu_usage": 45.5,
"memory_usage": 60.2,
"active_tasks": 3
}
]
}
# WebSocket连接
WS /api/v1/ws/tasks
Query Params:
token: string (JWT Token)
# 订阅任务更新
Client -> Server:
{
"action": "subscribe",
"task_ids": ["task_1", "task_2"]
}
# 服务器推送任务状态
Server -> Client:
{
"type": "task_status_update",
"task_id": "task_1",
"status": "running",
"progress": 50,
"timestamp": "2026-03-19T10:00:00Z"
}
# 服务器推送任务完成
Server -> Client:
{
"type": "task_completed",
"task_id": "task_1",
"result": {...}
}
# 取消订阅
Client -> Server:
{
"action": "unsubscribe",
"task_ids": ["task_1"]
}
| 安全措施 | 实施方案 | 配置参数 |
|---|---|---|
| VPC隔离 | 控制面、执行面、数据面独立VPC | CIDR: 10.0.0.0/8 |
| 安全组 | 最小化端口开放 | 仅开放443, 8443 |
| WAF | 阿里云WAF/腾讯云WAF | OWASP Top 10防护 |
| DDoS | 高防IP + 流量清洗 | 防护能力≥100Gbps |
TLS配置:
版本: TLS 1.3
加密套件:
- TLS_AES_256_GCM_SHA384
- TLS_CHACHA20_POLY1305_SHA256
证书:
类型: EV SSL证书
有效期: 1年
自动续期: 是
mTLS:
启用: 是 (内部服务间)
证书轮换: 90天
# JWT Token配置
JWT_CONFIG = {
"algorithm": "RS256",
"access_token_expiry": 7200, # 2小时
"refresh_token_expiry": 604800, # 7天
"issuer": "codecluster.com",
"audience": "codecluster-api",
"private_key_path": "/etc/secrets/jwt_private.pem",
"public_key_path": "/etc/secrets/jwt_public.pem"
}
# MFA配置
MFA_CONFIG = {
"enabled": True,
"methods": ["TOTP", "SMS", "Email"],
"backup_codes": 10,
"lockout_attempts": 5,
"lockout_duration": 900 # 15分钟
}
权限矩阵:
| 角色 | 创建任务 | 查看任务 | 删除任务 | 管理Worker | 查看审计 | 系统配置 |
|---|---|---|---|---|---|---|
| 超级管理员 | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ |
| 项目管理员 | ✓ | ✓ | ✓(本项目) | ✗ | ✓(本项目) | ✗ |
| 开发者 | ✓ | ✓(自己) | ✓(自己) | ✗ | ✗ | ✗ |
| 访客 | ✗ | ✓(公开) | ✗ | ✗ | ✗ | ✗ |
加密配置:
静态数据加密:
算法: AES-256-GCM
密钥管理: 阿里云KMS/腾讯云KMS
密钥轮换: 90天
传输加密:
算法: TLS 1.3
敏感数据:
密码: BCrypt (cost=12)
API密钥: HMAC-SHA256
凭证: AES-256加密存储
数据脱敏:
日志脱敏:
- 密码字段
- API密钥
- 个人身份信息
展示脱敏:
- 邮箱: u***@example.com
- 手机: 138****1234
# 沙箱安全配置
SANDBOX_SECURITY_CONFIG = {
# 容器隔离
"container": {
"image": "codecluster/sandbox:latest",
"read_only_rootfs": True,
"no_new_privileges": True,
"drop_capabilities": ["ALL"],
"add_capabilities": [],
},
# 资源限制
"resources": {
"cpu_limit": "2.0",
"memory_limit": "4Gi",
"disk_limit": "10Gi",
"pids_limit": 100,
},
# 网络隔离
"network": {
"enabled": False, # 默认禁用外网
"allowed_hosts": ["claude-api.anthropic.com"],
"blocked_ports": [22, 23, 3389],
},
# 文件系统
"filesystem": {
"readonly_mounts": ["/etc", "/usr", "/bin"],
"tmpfs_mounts": ["/tmp", "/var/tmp"],
"max_file_size": "100MB",
},
# 超时控制
"timeout": {
"execution_timeout": 300, # 5分钟
"idle_timeout": 60, # 1分钟
},
# 代码切片
"code_slicing": {
"enabled": True,
"max_chunk_size": "10000 lines",
"isolation_level": "process",
}
}
# 一次性沙箱策略
EPHEMERAL_SANDBOX_POLICY = {
"destroy_after_execution": True,
"destroy_on_timeout": True,
"destroy_on_error": True,
"max_lifetime": 600, # 10分钟
"snapshot_before_destroy": True, # 用于审计
}
代码切片隔离机制:
审计日志配置:
记录内容:
- 用户身份
- 操作类型
- 资源信息
- 时间戳
- IP地址
- 请求/响应数据
存储策略:
热存储: 30天 (Elasticsearch)
温存储: 90天 (S3)
冷存储: 365天 (归档存储)
完整性保护:
哈希链: SHA-256
防篡改: 是
异常检测:
规则引擎:
- 频繁失败登录
- 异常IP访问
- 超大数据下载
- 非工作时间操作
- 权限提升尝试
告警级别:
- 低: 邮件通知
- 中: 短信通知
- 高: 电话通知 + 自动阻断
# 任务签名验证
class TaskSignature:
def __init__(self, private_key_path: str, public_key_path: str):
self.private_key = load_private_key(private_key_path)
self.public_key = load_public_key(public_key_path)
def sign_task(self, task_data: dict) -> str:
"""对任务数据进行签名"""
canonical_data = json.dumps(task_data, sort_keys=True)
signature = self.private_key.sign(
canonical_data.encode(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return base64.b64encode(signature).decode()
def verify_signature(self, task_data: dict, signature: str) -> bool:
"""验证任务签名"""
try:
canonical_data = json.dumps(task_data, sort_keys=True)
self.public_key.verify(
base64.b64decode(signature),
canonical_data.encode(),
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except InvalidSignature:
return False
# 任务签名流程
# 1. Controller创建任务时签名
# 2. Worker接收任务时验证签名
# 3. 签名验证失败则拒绝执行
# Worker HPA配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: worker-hpa
namespace: codecluster-execution
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: worker
minReplicas: 10
maxReplicas: 100
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: queue_size
target:
type: AverageValue
averageValue: 10
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 10
periodSeconds: 15
selectPolicy: Max
# 扩缩容策略配置
scaling_policy:
# 扩容触发条件
scale_up:
conditions:
- metric: queue_size
threshold: 100
duration: 30s
- metric: cpu_utilization
threshold: 70%
duration: 60s
- metric: memory_utilization
threshold: 80%
duration: 60s
action:
type: additive
increment: 5 # 每次增加5个Pod
max_increment: 20
cooldown: 60s
# 缩容触发条件
scale_down:
conditions:
- metric: queue_size
threshold: 10
duration: 300s
- metric: cpu_utilization
threshold: 30%
duration: 300s
action:
type: percentage
decrement: 10%
min_replicas: 10
cooldown: 300s
# 定时扩缩容
scheduled_scaling:
- name: business_hours
schedule: "0 9 * * 1-5" # 工作日9点
min_replicas: 20
- name: off_hours
schedule: "0 20 * * 1-5" # 工作日20点
min_replicas: 10
# 资源配额配置
apiVersion: v1
kind: ResourceQuota
metadata:
name: execution-quota
namespace: codecluster-execution
spec:
hard:
requests.cpu: "500"
requests.memory: 1000Gi
limits.cpu: "1000"
limits.memory: 2000Gi
pods: "200"
persistentvolumeclaims: "50"
# LimitRange配置
apiVersion: v1
kind: LimitRange
metadata:
name: worker-limits
namespace: codecluster-execution
spec:
limits:
- type: Container
default:
cpu: "2"
memory: 4Gi
defaultRequest:
cpu: "1"
memory: 2Gi
max:
cpu: "4"
memory: 8Gi
min:
cpu: "500m"
memory: 1Gi
# 多可用区部署配置
apiVersion: apps/v1
kind: Deployment
metadata:
name: worker
namespace: codecluster-execution
spec:
replicas: 10
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 2
maxUnavailable: 1
template:
spec:
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
app: worker
topologyKey: kubernetes.io/hostname
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: topology.kubernetes.io/zone
operator: In
values:
- cn-hangzhou-a
- cn-hangzhou-b
- cn-hangzhou-c
topologySpreadConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: ScheduleAnyway
labelSelector:
matchLabels:
app: worker
| 周次 | 任务 | 交付物 | 负责人 |
|---|---|---|---|
| W1 | 需求分析与架构设计 | 架构设计文档、API规范 | 架构师 |
| W1 | 技术选型与POC验证 | POC报告、技术选型文档 | 技术负责人 |
| W2 | 云资源规划与申请 | 资源清单、网络规划 | 运维负责人 |
| W2-W3 | Kubernetes集群搭建 | K8s集群、监控体系 | 运维团队 |
| W3 | CI/CD流水线搭建 | Jenkins/GitLab CI配置 | DevOps |
| W4 | 数据库与中间件部署 | PostgreSQL、Redis集群 | DBA |
里程碑 M1: 2026-04-14 - 基础架构就绪
| 周次 | 任务 | 交付物 | 负责人 |
|---|---|---|---|
| W5-W6 | 控制面服务开发 | Controller、Task Manager源码 | 后端团队A |
| W5-W6 | Worker执行器开发 | Worker、Sandbox Manager源码 | 后端团队B |
| W7 | 认证授权模块开发 | Auth Service、RBAC源码 | 后端团队A |
| W7-W8 | Web Portal开发 | 前端源码、UI组件库 | 前端团队 |
| W8-W9 | API Gateway开发 | Kong配置、路由规则 | 后端团队B |
| W9 | Claude API集成 | AI调用模块、Prompt模板 | AI工程师 |
里程碑 M2: 2026-05-17 - 核心功能开发完成
| 周次 | 任务 | 交付物 | 负责人 |
|---|---|---|---|
| W10 | 模块集成测试 | 集成测试报告、Bug列表 | 测试团队 |
| W10 | 性能压测 | 性能测试报告、优化建议 | 测试团队 |
| W11 | 安全渗透测试 | 安全评估报告、修复建议 | 安全团队 |
| W11 | 故障演练 | 故障演练报告、应急预案 | SRE团队 |
里程碑 M3: 2026-05-31 - 测试验收通过
| 周次 | 任务 | 交付物 | 负责人 |
|---|---|---|---|
| W12 | 灰度发布(10%流量) | 灰度报告、监控数据 | 运维团队 |
| W12-W13 | 灰度发布(50%流量) | 稳定性报告 | 运维团队 |
| W13 | 全量上线 | 上线报告、回滚预案 | 运维团队 |
| W14 | 运维交接与培训 | 运维手册、培训材料 | 全体团队 |
里程碑 M4: 2026-06-28 - 项目正式上线
| 角色 | 人数 | 投入阶段 | 职责 |
|---|---|---|---|
| 架构师 | 1 | 全周期 | 架构设计、技术决策 |
| 技术负责人 | 1 | 全周期 | 技术管理、代码审查 |
| 后端工程师 | 4 | 阶段2-4 | 服务开发、API实现 |
| 前端工程师 | 2 | 阶段2-4 | Web Portal开发 |
| AI工程师 | 1 | 阶段2-3 | Claude API集成 |
| 测试工程师 | 2 | 阶段3 | 测试用例、质量保障 |
| 运维工程师 | 2 | 阶段1,4 | 基础设施、部署运维 |
| DBA | 1 | 阶段1,3 | 数据库设计与优化 |
| 安全工程师 | 1 | 阶段3 | 安全评估、渗透测试 |
| DevOps | 1 | 阶段1-4 | CI/CD、自动化 |
| 风险 | 概率 | 影响 | 应对措施 |
|---|---|---|---|
| Claude API限流 | 中 | 高 | 多账号轮换、请求队列、本地缓存 |
| 沙箱逃逸漏洞 | 低 | 极高 | 多层隔离、定期安全审计、最小权限 |
| 性能不达标 | 中 | 中 | 提前压测、性能优化、水平扩展 |
| 人员流失 | 中 | 中 | 知识文档化、AB角备份 |
| 云资源不足 | 低 | 高 | 提前预留、多云备份方案 |
| 安全合规问题 | 中 | 高 | 提前法务评审、合规检查 |
| 风险项 | 风险等级 | 触发条件 | 应对方案 |
|---|---|---|---|
| Claude API不稳定 | 🔴 高 | API错误率>5% | 1. 多账号负载均衡 2. 本地降级策略 3. 请求重试机制 |
| 沙箱安全漏洞 | 🔴 高 | 发现逃逸可能 | 1. 多层隔离 2. 定期渗透测试 3. 最小权限原则 |
| Redis单点故障 | 🟡 中 | 主节点宕机 | 1. Redis Cluster部署 2. 持久化配置 3. 自动故障转移 |
| 数据库性能瓶颈 | 🟡 中 | QPS>10000 | 1. 读写分离 2. 分库分表 3. 缓存优化 |
| 风险项 | 风险等级 | 触发条件 | 应对方案 |
|---|---|---|---|
| 成本超预算 | 🟡 中 | 月度成本>预算20% | 1. 成本监控告警 2. 自动缩容策略 3. 预留实例优化 |
| 用户投诉增加 | 🟡 中 | 投诉率>1% | 1. 快速响应机制 2. 问题根因分析 3. 持续改进 |
| 合规审计不通过 | 🟠 高 | 安全审计发现高危问题 | 1. 提前合规评审 2. 定期安全扫描 3. 整改闭环 |
# 应急预案配置
emergency_plan:
# API故障应急
api_failure:
trigger: API错误率 > 10% 持续5分钟
actions:
- 切换备用API账号
- 启用本地缓存
- 降级为非AI模式
- 通知值班人员
rollback: API恢复正常后自动切换
# Worker故障应急
worker_failure:
trigger: Worker离线 > 30%
actions:
- 自动扩容新Worker
- 任务重新调度
- 告警通知
rollback: 故障Worker恢复后重新加入
# 数据库故障应急
database_failure:
trigger: 主库不可用
actions:
- 自动切换到从库
- 启用只读模式
- 通知DBA
rollback: 主库恢复后数据同步
# 安全事件应急
security_incident:
trigger: 检测到安全攻击
actions:
- 隔离受影响节点
- 阻断攻击源IP
- 启动取证流程
- 通知安全团队
rollback: 威胁消除后恢复
| 术语 | 定义 |
|---|---|
| Controller | 中央调度控制器,负责任务分发和Worker管理 |
| Worker | 任务执行节点,负责调用Claude Code和执行代码 |
| Sandbox | 一次性代码执行沙箱,提供隔离的执行环境 |
| HPA | Horizontal Pod Autoscaler,Kubernetes水平自动扩缩容 |
| RBAC | Role-Based Access Control,基于角色的访问控制 |
| mTLS | Mutual TLS,双向TLS认证 |
| KMS | Key Management Service,密钥管理服务 |
| 版本 | 日期 | 作者 | 变更说明 |
|---|---|---|---|
| v1.0 | 2026-03-19 | 架构团队 | 初始版本,完整架构设计 |
文档结束
本方案文档为技术评审版,实施过程中可能根据实际情况进行调整。
任何变更需经过变更控制委员会(CCB)审批。