Agent 冷启动与吞吐量优化完整实现
import time
import json
import hashlib
import secrets
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import statistics
import threading
from concurrent.futures import ThreadPoolExecutor
import queue
class WarmupStrategy(Enum):
"""预热策略"""
MODEL_WARMUP = "model_warmup" # 模型预热
CACHE_WARMUP = "cache_warmup" # 缓存预热
CONNECTION_POOL = "connection_pool" # 连接池预热
PROGRESSIVE_LOADING = "progressive" # 渐进式加载
class BatchingStrategy(Enum):
"""批处理策略"""
STATIC_BATCHING = "static" # 静态批处理
CONTINUOUS_BATCHING = "continuous" # 连续批处理
DYNAMIC_BATCHING = "dynamic" # 动态批处理
REQUEST_MERGING = "merging" # 请求合并
@dataclass
class StartupConfig:
"""启动配置"""
model_path: str
model_size: str # '7b', '13b', '70b'
gpu_memory: int # GB
warmup_strategy: WarmupStrategy
warmup_requests: int
enable_cache_warmup: bool
cache_warmup_size: int
enable_connection_pool: bool
pool_size: int
progressive_loading: bool
loading_stages: int
@dataclass
class ThroughputConfig:
"""吞吐量配置"""
batching_strategy: BatchingStrategy
max_batch_size: int
max_wait_time: float # ms
enable_continuous_batching: bool
enable_dynamic_batching: bool
dynamic_batch_threshold: float
enable_request_merging: bool
merge_window: float # ms
pipeline_parallel: bool
pipeline_stages: int
@dataclass
class StartupMetrics:
"""启动指标"""
model_load_time: float # 秒
memory_alloc_time: float # 秒
init_time: float # 秒
warmup_time: float # 秒
total_startup_time: float # 秒
first_request_latency: float # ms
gpu_memory_used: float # GB
ready: bool
@dataclass
class ThroughputMetrics:
"""吞吐量指标"""
requests_per_second: float
tokens_per_second: float
avg_batch_size: float
gpu_utilization: float
avg_latency: float # ms
p50_latency: float # ms
p95_latency: float # ms
p99_latency: float # ms
@dataclass
class Request:
"""请求"""
request_id: str
prompt: str
max_tokens: int
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class Response:
"""响应"""
request_id: str
generated_text: str
tokens_generated: int
latency: float # ms
batch_id: Optional[str]
timestamp: datetime = field(default_factory=datetime.now)
class ColdStartOptimizer:
"""
冷启动优化器
支持:
1. 模型预热
2. 缓存预热
3. 连接池预热
4. 渐进式加载
"""
def __init__(self, config: StartupConfig):
self.config = config
self.model_loaded = False
self.cache_warmed = False
self.pool_initialized = False
self.metrics = {}
self.warmup_cache = {}
self.connection_pool = []
def load_model(self) -> float:
"""加载模型(模拟)"""
start_time = time.time()
# 模拟模型加载时间(实际应从磁盘加载)
model_load_times = {
'7b': 8.5,
'13b': 15.2,
'70b': 65.8,
'405b': 280.5
}
# 模拟加载延迟
base_time = model_load_times.get(self.config.model_size, 15.0)
# 如果启用渐进式加载,分阶段加载
if self.config.progressive_loading:
stage_time = base_time / self.config.loading_stages
for i in range(self.config.loading_stages):
time.sleep(stage_time * 0.3) # 模拟分阶段加载
else:
time.sleep(base_time * 0.3) # 加速模拟
load_time = time.time() - start_time
self.model_loaded = True
return load_time
def allocate_memory(self) -> float:
"""分配显存(模拟)"""
start_time = time.time()
# 模拟显存分配时间
alloc_time = 0.5 + (self.config.gpu_memory * 0.05)
time.sleep(alloc_time * 0.1) # 加速模拟
alloc_time = time.time() - start_time
return alloc_time
def warmup_model(self) -> float:
"""模型预热"""
start_time = time.time()
if self.config.warmup_strategy == WarmupStrategy.MODEL_WARMUP:
# 执行预热请求
for i in range(self.config.warmup_requests):
# 模拟预热推理
time.sleep(0.05) # 模拟小请求
self.warmup_cache[f"warmup_{i}"] = f"response_{i}"
warmup_time = time.time() - start_time
return warmup_time
def warmup_cache(self) -> float:
"""缓存预热"""
if not self.config.enable_cache_warmup:
return 0.0
start_time = time.time()
# 预加载常用缓存
for i in range(self.config.cache_warmup_size):
key = f"cache_key_{i}"
value = f"cache_value_{i}"
self.warmup_cache[key] = value
cache_time = time.time() - start_time
self.cache_warmed = True
return cache_time
def initialize_connection_pool(self) -> float:
"""初始化连接池"""
if not self.config.enable_connection_pool:
return 0.0
start_time = time.time()
# 创建连接池
for i in range(self.config.pool_size):
conn = {"id": i, "status": "ready", "created": datetime.now()}
self.connection_pool.append(conn)
pool_time = time.time() - start_time
self.pool_initialized = True
return pool_time
def startup(self) -> StartupMetrics:
"""执行完整启动流程"""
total_start = time.time()
# 1. 加载模型
model_load_time = self.load_model()
# 2. 分配显存
memory_alloc_time = self.allocate_memory()
# 3. 初始化(并行)
init_start = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {
'warmup': executor.submit(self.warmup_model),
'cache': executor.submit(self.warmup_cache),
'pool': executor.submit(self.initialize_connection_pool)
}
warmup_time = futures['warmup'].result()
cache_time = futures['cache'].result()
pool_time = futures['pool'].result()
init_time = time.time() - init_start
# 4. 计算总启动时间
total_startup_time = time.time() - total_start
# 5. 模拟首请求延迟
first_request_latency = 50 + np.random.uniform(10, 30) # ms
# 6. 估算显存使用
gpu_memory_used = self.config.gpu_memory * 0.65 # 65% 使用率
metrics = StartupMetrics(
model_load_time=model_load_time,
memory_alloc_time=memory_alloc_time,
init_time=init_time,
warmup_time=warmup_time,
total_startup_time=total_startup_time,
first_request_latency=first_request_latency,
gpu_memory_used=gpu_memory_used,
ready=True
)
self.metrics = metrics
return metrics
def get_status(self) -> Dict[str, Any]:
"""获取启动状态"""
return {
"model_loaded": self.model_loaded,
"cache_warmed": self.cache_warmed,
"pool_initialized": self.pool_initialized,
"ready": self.metrics.ready if hasattr(self.metrics, 'ready') else False,
"warmup_cache_size": len(self.warmup_cache),
"connection_pool_size": len(self.connection_pool)
}
class ThroughputOptimizer:
"""
吞吐量优化器
支持:
1. 连续批处理
2. 动态批处理
3. 请求合并
4. 流水线并行
"""
def __init__(self, config: ThroughputConfig):
self.config = config
self.request_queue = queue.Queue()
self.current_batch = []
self.batch_lock = threading.Lock()
self.responses = []
self.latency_history = []
self.throughput_history = []
def add_request(self, request: Request):
"""添加请求到队列"""
self.request_queue.put(request)
def continuous_batching(self) -> List[Request]:
"""连续批处理"""
if not self.config.enable_continuous_batching:
return [self.request_queue.get()]
batch = []
start_time = time.time()
# 收集请求直到达到最大批处理大小或等待时间
while len(batch) < self.config.max_batch_size:
try:
request = self.request_queue.get_nowait()
batch.append(request)
except queue.Empty:
# 检查等待时间
elapsed = (time.time() - start_time) * 1000 # ms
if elapsed >= self.config.max_wait_time:
break
time.sleep(0.001) # 1ms
return batch if batch else [self.request_queue.get()]
def dynamic_batching(self) -> List[Request]:
"""动态批处理"""
if not self.config.enable_dynamic_batching:
return [self.request_queue.get()]
# 根据当前负载动态调整批处理大小
current_load = len(self.latency_history)
if current_load < 100:
dynamic_batch_size = self.config.max_batch_size * 0.5
elif current_load < 500:
dynamic_batch_size = self.config.max_batch_size * 0.75
else:
dynamic_batch_size = self.config.max_batch_size
batch = []
start_time = time.time()
while len(batch) < dynamic_batch_size:
try:
request = self.request_queue.get_nowait()
batch.append(request)
except queue.Empty:
elapsed = (time.time() - start_time) * 1000
if elapsed >= self.config.max_wait_time:
break
time.sleep(0.001)
return batch if batch else [self.request_queue.get()]
def merge_requests(self, requests: List[Request]) -> List[Request]:
"""请求合并"""
if not self.config.enable_request_merging:
return requests
# 合并相似请求(简化版:基于 prompt 前缀)
merged = []
merge_window = {}
for request in requests:
prefix = request.prompt[:50] # 使用前 50 个字符作为合并键
if prefix in merge_window:
# 合并到现有请求
merge_window[prefix].append(request)
else:
merge_window[prefix] = [request]
# 创建合并后的请求
for prefix, reqs in merge_window.items():
if len(reqs) > 1:
# 合并多个请求
merged_prompt = reqs[0].prompt
merged_request = Request(
request_id=f"merged_{secrets.token_hex(8)}",
prompt=merged_prompt,
max_tokens=max(r.max_tokens for r in reqs),
timestamp=datetime.now()
)
merged.append(merged_request)
else:
merged.extend(reqs)
return merged
def process_batch(self, batch: List[Request]) -> List[Response]:
"""处理批处理请求(模拟)"""
batch_id = f"batch_{secrets.token_hex(8)}"
responses = []
# 模拟批处理推理
base_latency = 50 + np.random.uniform(10, 30) # ms
batch_size_factor = 1.0 + (len(batch) * 0.05) # 批处理大小影响
for request in batch:
latency = base_latency * batch_size_factor
tokens_generated = np.random.randint(20, 100)
response = Response(
request_id=request.request_id,
generated_text=f"Response to {request.prompt[:30]}...",
tokens_generated=tokens_generated,
latency=latency,
batch_id=batch_id
)
responses.append(response)
self.latency_history.append(latency)
self.responses.extend(responses)
return responses
def calculate_throughput(self, time_window: int = 60) -> ThroughputMetrics:
"""计算吞吐量指标"""
if not self.latency_history:
return ThroughputMetrics(
requests_per_second=0,
tokens_per_second=0,
avg_batch_size=0,
gpu_utilization=0,
avg_latency=0,
p50_latency=0,
p95_latency=0,
p99_latency=0
)
# 计算延迟百分位数
sorted_latencies = sorted(self.latency_history)
n = len(sorted_latencies)
p50_idx = int(n * 0.50)
p95_idx = int(n * 0.95)
p99_idx = int(n * 0.99)
p50_latency = sorted_latencies[p50_idx] if p50_idx < n else sorted_latencies[-1]
p95_latency = sorted_latencies[p95_idx] if p95_idx < n else sorted_latencies[-1]
p99_latency = sorted_latencies[p99_idx] if p99_idx < n else sorted_latencies[-1]
avg_latency = statistics.mean(self.latency_history)
# 估算吞吐量
total_time = len(self.latency_history) * avg_latency / 1000 # 秒
requests_per_second = len(self.latency_history) / total_time if total_time > 0 else 0
total_tokens = sum(r.tokens_generated for r in self.responses)
tokens_per_second = total_tokens / total_time if total_time > 0 else 0
# 估算 GPU 利用率
gpu_utilization = min(0.95, requests_per_second / 100) # 简化估算
metrics = ThroughputMetrics(
requests_per_second=requests_per_second,
tokens_per_second=tokens_per_second,
avg_batch_size=self.config.max_batch_size * 0.7, # 估算
gpu_utilization=gpu_utilization,
avg_latency=avg_latency,
p50_latency=p50_latency,
p95_latency=p95_latency,
p99_latency=p99_latency
)
return metrics
def run_optimization_loop(self, duration: int = 10):
"""运行优化循环(模拟)"""
print(f"启动吞吐量优化循环,持续 {duration} 秒...")
start_time = time.time()
request_count = 0
while (time.time() - start_time) < duration:
# 模拟请求到达
if np.random.random() < 0.3: # 30% 概率有新请求
request = Request(
request_id=f"req_{request_count}",
prompt=f"Test prompt {request_count}",
max_tokens=50
)
self.add_request(request)
request_count += 1
# 处理批处理
if not self.request_queue.empty():
if self.config.batching_strategy == BatchingStrategy.CONTINUOUS_BATCHING:
batch = self.continuous_batching()
elif self.config.batching_strategy == BatchingStrategy.DYNAMIC_BATCHING:
batch = self.dynamic_batching()
else:
batch = [self.request_queue.get()]
# 请求合并
if self.config.enable_request_merging:
batch = self.merge_requests(batch)
# 处理批处理
responses = self.process_batch(batch)
time.sleep(0.01) # 10ms 间隔
print(f"优化循环完成,处理 {request_count} 个请求")
# 使用示例
if __name__ == "__main__":
print("=== Agent 冷启动、预热与吞吐量优化 ===\n")
print("=== 创建冷启动优化器 ===")
# 启动配置
startup_config = StartupConfig(
model_path="/models/llama-70b",
model_size='70b',
gpu_memory=80,
warmup_strategy=WarmupStrategy.MODEL_WARMUP,
warmup_requests=10,
enable_cache_warmup=True,
cache_warmup_size=100,
enable_connection_pool=True,
pool_size=20,
progressive_loading=True,
loading_stages=3
)
startup_optimizer = ColdStartOptimizer(startup_config)
print(f"模型路径:{startup_config.model_path}")
print(f"模型大小:{startup_config.model_size}")
print(f"GPU 显存:{startup_config.gpu_memory}GB")
print(f"预热策略:{startup_config.warmup_strategy.value}")
print(f"预热请求数:{startup_config.warmup_requests}")
print(f"缓存预热:{startup_config.enable_cache_warmup}")
print(f"连接池:{startup_config.enable_connection_pool} ({startup_config.pool_size})")
print(f"渐进式加载:{startup_config.progressive_loading}")
print()
print("=== 执行启动流程 ===")
metrics = startup_optimizer.startup()
print(f"\n启动指标:")
print(f" 模型加载时间:{metrics.model_load_time:.2f}s")
print(f" 显存分配时间:{metrics.memory_alloc_time:.2f}s")
print(f" 初始化时间:{metrics.init_time:.2f}s")
print(f" 预热时间:{metrics.warmup_time:.2f}s")
print(f" 总启动时间:{metrics.total_startup_time:.2f}s")
print(f" 首请求延迟:{metrics.first_request_latency:.1f}ms")
print(f" GPU 显存使用:{metrics.gpu_memory_used:.1f}GB")
print(f" 就绪状态:{metrics.ready}")
print(f"\n启动状态:")
status = startup_optimizer.get_status()
for key, value in status.items():
print(f" {key}: {value}")
print(f"\n=== 创建吞吐量优化器 ===")
# 吞吐量配置
throughput_config = ThroughputConfig(
batching_strategy=BatchingStrategy.CONTINUOUS_BATCHING,
max_batch_size=32,
max_wait_time=10.0,
enable_continuous_batching=True,
enable_dynamic_batching=True,
dynamic_batch_threshold=0.7,
enable_request_merging=True,
merge_window=5.0,
pipeline_parallel=False,
pipeline_stages=1
)
throughput_optimizer = ThroughputOptimizer(throughput_config)
print(f"批处理策略:{throughput_config.batching_strategy.value}")
print(f"最大批处理大小:{throughput_config.max_batch_size}")
print(f"最大等待时间:{throughput_config.max_wait_time}ms")
print(f"连续批处理:{throughput_config.enable_continuous_batching}")
print(f"动态批处理:{throughput_config.enable_dynamic_batching}")
print(f"请求合并:{throughput_config.enable_request_merging}")
print()
print("=== 运行吞吐量优化循环 ===")
throughput_optimizer.run_optimization_loop(duration=5)
print(f"\n吞吐量指标:")
throughput_metrics = throughput_optimizer.calculate_throughput()
print(f" 请求/秒:{throughput_metrics.requests_per_second:.1f}")
print(f" Token/秒:{throughput_metrics.tokens_per_second:.1f}")
print(f" 平均批处理大小:{throughput_metrics.avg_batch_size:.1f}")
print(f" GPU 利用率:{throughput_metrics.gpu_utilization:.1%}")
print(f" 平均延迟:{throughput_metrics.avg_latency:.1f}ms")
print(f" P50 延迟:{throughput_metrics.p50_latency:.1f}ms")
print(f" P95 延迟:{throughput_metrics.p95_latency:.1f}ms")
print(f" P99 延迟:{throughput_metrics.p99_latency:.1f}ms")
print(f"\n关键观察:")
print("1. 冷启动优化:模型加载、显存分配、预热策略")
print("2. 预热技术:模型预热、缓存预热、连接池预热")
print("3. 吞吐量优化:连续批处理、动态批处理、请求合并")
print("4. 系统部署:弹性伸缩、负载均衡、多实例")
print("5. 热启动高效:冷启动 + 预热 + 吞吐 + 部署 = 可信赖")
print("\n热启动高效的使命:让 AI 服务更快、更稳、更高效")