Agent 扩容与限流完整实现
import time
import json
import hashlib
import secrets
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import statistics
import threading
from concurrent.futures import ThreadPoolExecutor
import queue
import random
class ScalingStrategy(Enum):
"""扩容策略"""
HORIZONTAL = "horizontal" # 水平扩容
VERTICAL = "vertical" # 垂直扩容
AUTO = "auto" # 自动扩容
WARMUP = "warmup" # 预热扩容
class RateLimitAlgorithm(Enum):
"""限流算法"""
TOKEN_BUCKET = "token_bucket" # 令牌桶
LEAKY_BUCKET = "leaky_bucket" # 漏桶
SLIDING_WINDOW = "sliding_window" # 滑动窗口
FIXED_WINDOW = "fixed_window" # 固定窗口
@dataclass
class ScalingConfig:
"""扩容配置"""
strategy: ScalingStrategy
min_instances: int
max_instances: int
target_cpu_utilization: float
target_memory_utilization: float
scale_up_threshold: float
scale_down_threshold: float
scale_up_cooldown: int # 秒
scale_down_cooldown: int # 秒
warmup_instances: int
warmup_time: int # 秒
@dataclass
class RateLimitConfig:
"""限流配置"""
algorithm: RateLimitAlgorithm
requests_per_second: int
burst_size: int
window_size: int # 秒
distributed: bool
redis_host: Optional[str]
redis_port: Optional[int]
key_prefix: str
@dataclass
class Instance:
"""服务实例"""
instance_id: str
cpu_utilization: float
memory_utilization: float
requests_per_second: float
health_status: str
created_at: datetime = field(default_factory=datetime.now)
last_scaled: Optional[datetime] = None
@dataclass
class ScalingDecision:
"""扩容决策"""
decision_id: str
action: str # 'scale_up', 'scale_down', 'no_op'
current_instances: int
target_instances: int
reason: str
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class RateLimitResult:
"""限流结果"""
allowed: bool
remaining_tokens: int
retry_after: Optional[float] # 秒
limit: int
reset_time: datetime
class TokenBucketLimiter:
"""
令牌桶限流器
支持:
1. 固定速率添加令牌
2. 突发流量支持
3. 分布式限流
"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.tokens = float(config.burst_size)
self.last_update = datetime.now()
self.lock = threading.Lock()
self.requests_allowed = 0
self.requests_denied = 0
def _refill_tokens(self):
"""补充令牌"""
now = datetime.now()
elapsed = (now - self.last_update).total_seconds()
# 添加令牌(固定速率)
tokens_to_add = elapsed * self.config.requests_per_second
self.tokens = min(self.config.burst_size, self.tokens + tokens_to_add)
self.last_update = now
def allow_request(self) -> RateLimitResult:
"""允许请求"""
with self.lock:
self._refill_tokens()
if self.tokens >= 1:
self.tokens -= 1
self.requests_allowed += 1
return RateLimitResult(
allowed=True,
remaining_tokens=int(self.tokens),
retry_after=None,
limit=self.config.burst_size,
reset_time=datetime.now() + timedelta(seconds=1)
)
else:
self.requests_denied += 1
# 计算等待时间
retry_after = (1 - self.tokens) / self.config.requests_per_second
return RateLimitResult(
allowed=False,
remaining_tokens=0,
retry_after=retry_after,
limit=self.config.burst_size,
reset_time=datetime.now() + timedelta(seconds=retry_after)
)
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total = self.requests_allowed + self.requests_denied
return {
"requests_allowed": self.requests_allowed,
"requests_denied": self.requests_denied,
"allow_rate": self.requests_allowed / total if total > 0 else 0,
"current_tokens": int(self.tokens),
"timestamp": datetime.now().isoformat()
}
class LeakyBucketLimiter:
"""
漏桶限流器
支持:
1. 固定速率漏出
2. 平滑流量
3. 队列管理
"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.water_level = 0.0
self.last_leak = datetime.now()
self.lock = threading.Lock()
self.queue = deque()
self.requests_processed = 0
self.requests_dropped = 0
def _leak_water(self):
"""漏水"""
now = datetime.now()
elapsed = (now - self.last_leak).total_seconds()
# 漏出水量(固定速率)
leaked = elapsed * self.config.requests_per_second
self.water_level = max(0, self.water_level - leaked)
self.last_leak = now
def allow_request(self) -> RateLimitResult:
"""允许请求"""
with self.lock:
self._leak_water()
if self.water_level < self.config.burst_size:
self.water_level += 1
self.requests_processed += 1
return RateLimitResult(
allowed=True,
remaining_tokens=int(self.config.burst_size - self.water_level),
retry_after=None,
limit=self.config.burst_size,
reset_time=datetime.now() + timedelta(seconds=1)
)
else:
self.requests_dropped += 1
# 计算等待时间
retry_after = (self.water_level - self.config.burst_size + 1) / self.config.requests_per_second
return RateLimitResult(
allowed=False,
remaining_tokens=0,
retry_after=retry_after,
limit=self.config.burst_size,
reset_time=datetime.now() + timedelta(seconds=retry_after)
)
class SlidingWindowLimiter:
"""
滑动窗口限流器
支持:
1. 滑动时间窗口
2. 精确计数
3. 防突发流量
"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.window_size = config.window_size
self.max_requests = config.requests_per_second * config.window_size
self.requests = deque()
self.lock = threading.Lock()
self.requests_allowed = 0
self.requests_denied = 0
def _clean_old_requests(self):
"""清理旧请求"""
now = datetime.now()
cutoff = now - timedelta(seconds=self.window_size)
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
def allow_request(self) -> RateLimitResult:
"""允许请求"""
with self.lock:
self._clean_old_requests()
if len(self.requests) < self.max_requests:
self.requests.append(datetime.now())
self.requests_allowed += 1
remaining = self.max_requests - len(self.requests)
return RateLimitResult(
allowed=True,
remaining_tokens=remaining,
retry_after=None,
limit=self.max_requests,
reset_time=datetime.now() + timedelta(seconds=self.window_size)
)
else:
self.requests_denied += 1
# 计算等待时间
oldest = self.requests[0]
retry_after = (oldest + timedelta(seconds=self.window_size) - datetime.now()).total_seconds()
return RateLimitResult(
allowed=False,
remaining_tokens=0,
retry_after=max(0, retry_after),
limit=self.max_requests,
reset_time=oldest + timedelta(seconds=self.window_size)
)
class AutoScaler:
"""
自动扩容器
支持:
1. 基于 CPU 利用率
2. 基于内存利用率
3. 基于请求量
4. 冷却时间
"""
def __init__(self, config: ScalingConfig):
self.config = config
self.instances: List[Instance] = []
self.scaling_history: List[ScalingDecision] = []
self.last_scale_up = None
self.last_scale_down = None
self.lock = threading.Lock()
def _create_instance(self) -> Instance:
"""创建实例"""
instance_id = f"instance_{secrets.token_hex(8)}"
return Instance(
instance_id=instance_id,
cpu_utilization=0.0,
memory_utilization=0.0,
requests_per_second=0.0,
health_status="healthy"
)
def _simulate_metrics(self, instance: Instance):
"""模拟指标(实际应从监控系统获取)"""
# 模拟 CPU 和内存利用率波动
instance.cpu_utilization = min(100, max(0, instance.cpu_utilization + random.uniform(-10, 10)))
instance.memory_utilization = min(100, max(0, instance.memory_utilization + random.uniform(-5, 5)))
instance.requests_per_second = max(0, instance.requests_per_second + random.uniform(-50, 50))
def evaluate_scaling(self) -> ScalingDecision:
"""评估扩容决策"""
with self.lock:
if not self.instances:
# 没有实例,创建初始实例
return self._scale_up(1, "Initial scaling")
# 更新实例指标
for instance in self.instances:
self._simulate_metrics(instance)
# 计算平均利用率
avg_cpu = statistics.mean([i.cpu_utilization for i in self.instances])
avg_memory = statistics.mean([i.memory_utilization for i in self.instances])
now = datetime.now()
# 检查是否需要扩容
if avg_cpu > self.config.scale_up_threshold or avg_memory > self.config.scale_up_threshold:
# 检查冷却时间
if self.last_scale_up and (now - self.last_scale_up).total_seconds() < self.config.scale_up_cooldown:
return ScalingDecision(
decision_id=f"decision_{secrets.token_hex(8)}",
action="no_op",
current_instances=len(self.instances),
target_instances=len(self.instances),
reason="Scale up cooldown"
)
# 扩容
scale_count = max(1, int(len(self.instances) * 0.5)) # 增加 50%
return self._scale_up(scale_count, f"High utilization: CPU={avg_cpu:.1f}%, Memory={avg_memory:.1f}%")
elif avg_cpu < self.config.scale_down_threshold and avg_memory < self.config.scale_down_threshold:
# 检查冷却时间
if self.last_scale_down and (now - self.last_scale_down).total_seconds() < self.config.scale_down_cooldown:
return ScalingDecision(
decision_id=f"decision_{secrets.token_hex(8)}",
action="no_op",
current_instances=len(self.instances),
target_instances=len(self.instances),
reason="Scale down cooldown"
)
# 缩容
scale_count = max(1, int(len(self.instances) * 0.3)) # 减少 30%
return self._scale_down(scale_count, f"Low utilization: CPU={avg_cpu:.1f}%, Memory={avg_memory:.1f}%")
else:
return ScalingDecision(
decision_id=f"decision_{secrets.token_hex(8)}",
action="no_op",
current_instances=len(self.instances),
target_instances=len(self.instances),
reason="Within target range"
)
def _scale_up(self, count: int, reason: str) -> ScalingDecision:
"""扩容"""
current = len(self.instances)
target = min(current + count, self.config.max_instances)
if target <= current:
return ScalingDecision(
decision_id=f"decision_{secrets.token_hex(8)}",
action="no_op",
current_instances=current,
target_instances=current,
reason="Already at max instances"
)
# 创建新实例
for _ in range(target - current):
instance = self._create_instance()
self.instances.append(instance)
self.last_scale_up = datetime.now()
decision = ScalingDecision(
decision_id=f"decision_{secrets.token_hex(8)}",
action="scale_up",
current_instances=current,
target_instances=target,
reason=reason
)
self.scaling_history.append(decision)
return decision
def _scale_down(self, count: int, reason: str) -> ScalingDecision:
"""缩容"""
current = len(self.instances)
target = max(current - count, self.config.min_instances)
if target >= current:
return ScalingDecision(
decision_id=f"decision_{secrets.token_hex(8)}",
action="no_op",
current_instances=current,
target_instances=current,
reason="Already at min instances"
)
# 移除实例(移除最不健康的)
self.instances.sort(key=lambda x: (x.health_status != "healthy", -x.cpu_utilization))
self.instances = self.instances[:target]
self.last_scale_down = datetime.now()
decision = ScalingDecision(
decision_id=f"decision_{secrets.token_hex(8)}",
action="scale_down",
current_instances=current,
target_instances=target,
reason=reason
)
self.scaling_history.append(decision)
return decision
def initialize(self, count: int):
"""初始化实例"""
with self.lock:
for _ in range(count):
instance = self._create_instance()
self.instances.append(instance)
def get_status(self) -> Dict[str, Any]:
"""获取状态"""
if not self.instances:
return {"instances": 0, "status": "no_instances"}
avg_cpu = statistics.mean([i.cpu_utilization for i in self.instances])
avg_memory = statistics.mean([i.memory_utilization for i in self.instances])
return {
"instances": len(self.instances),
"avg_cpu_utilization": avg_cpu,
"avg_memory_utilization": avg_memory,
"healthy_instances": sum(1 for i in self.instances if i.health_status == "healthy"),
"total_scaling_decisions": len(self.scaling_history),
"timestamp": datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
print("=== Agent 高并发服务扩容与限流 ===\n")
print("=== 创建限流器 ===")
# 限流配置
ratelimit_config = RateLimitConfig(
algorithm=RateLimitAlgorithm.TOKEN_BUCKET,
requests_per_second=100,
burst_size=200,
window_size=60,
distributed=False,
redis_host=None,
redis_port=None,
key_prefix="api_limit"
)
# 创建限流器
token_bucket = TokenBucketLimiter(ratelimit_config)
leaky_bucket = LeakyBucketLimiter(ratelimit_config)
sliding_window = SlidingWindowLimiter(ratelimit_config)
print(f"限流算法:{ratelimit_config.algorithm.value}")
print(f"请求/秒:{ratelimit_config.requests_per_second}")
print(f"突发大小:{ratelimit_config.burst_size}")
print(f"窗口大小:{ratelimit_config.window_size}s")
print()
print("=== 测试令牌桶限流 ===")
for i in range(15):
result = token_bucket.allow_request()
status = "✓ 允许" if result.allowed else f"✗ 拒绝 (等待 {result.retry_after:.2f}s)"
print(f" 请求 {i+1}: {status}, 剩余令牌:{result.remaining_tokens}")
print(f"\n令牌桶统计:")
stats = token_bucket.get_statistics()
print(f" 允许请求:{stats['requests_allowed']}")
print(f" 拒绝请求:{stats['requests_denied']}")
print(f" 允许率:{stats['allow_rate']:.1%}")
print(f"\n=== 测试漏桶限流 ===")
for i in range(15):
result = leaky_bucket.allow_request()
status = "✓ 允许" if result.allowed else f"✗ 拒绝 (等待 {result.retry_after:.2f}s)"
print(f" 请求 {i+1}: {status}, 剩余容量:{result.remaining_tokens}")
print(f"\n=== 测试滑动窗口限流 ===")
for i in range(15):
result = sliding_window.allow_request()
status = "✓ 允许" if result.allowed else f"✗ 拒绝 (等待 {result.retry_after:.2f}s)"
print(f" 请求 {i+1}: {status}, 剩余配额:{result.remaining_tokens}")
print(f"\n=== 创建自动扩容器 ===")
# 扩容配置
scaling_config = ScalingConfig(
strategy=ScalingStrategy.AUTO,
min_instances=2,
max_instances=20,
target_cpu_utilization=70.0,
target_memory_utilization=75.0,
scale_up_threshold=80.0,
scale_down_threshold=40.0,
scale_up_cooldown=60,
scale_down_cooldown=120,
warmup_instances=2,
warmup_time=30
)
autoscaler = AutoScaler(scaling_config)
print(f"扩容策略:{scaling_config.strategy.value}")
print(f"最小实例:{scaling_config.min_instances}")
print(f"最大实例:{scaling_config.max_instances}")
print(f"目标 CPU 利用率:{scaling_config.target_cpu_utilization}%")
print(f"目标内存利用率:{scaling_config.target_memory_utilization}%")
print(f"扩容阈值:{scaling_config.scale_up_threshold}%")
print(f"缩容阈值:{scaling_config.scale_down_threshold}%")
print()
print("=== 初始化实例 ===")
autoscaler.initialize(3)
status = autoscaler.get_status()
print(f"初始实例数:{status['instances']}")
print(f"健康实例:{status['healthy_instances']}")
print()
print("=== 模拟负载并评估扩容 ===")
for i in range(5):
print(f"\n轮次 {i+1}:")
decision = autoscaler.evaluate_scaling()
print(f" 决策:{decision.action}")
print(f" 当前实例:{decision.current_instances} → 目标实例:{decision.target_instances}")
print(f" 原因:{decision.reason}")
status = autoscaler.get_status()
print(f" 实例数:{status['instances']}")
print(f" 平均 CPU: {status['avg_cpu_utilization']:.1f}%")
print(f" 平均内存:{status['avg_memory_utilization']:.1f}%")
print(f"\n=== 扩容历史 ===")
for decision in autoscaler.scaling_history[-3:]:
print(f" {decision.action}: {decision.current_instances} → {decision.target_instances} ({decision.reason})")
print(f"\n关键观察:")
print("1. 服务扩容:水平扩容、垂直扩容、自动扩容")
print("2. 限流技术:令牌桶、漏桶、滑动窗口")
print("3. 高并发:负载均衡、请求分发、并发控制")
print("4. 弹性架构:自动伸缩、故障转移、多活容灾")
print("5. 弹性高可用:扩容 + 限流 + 并发 + 弹性 = 可信赖")
print("\n弹性高可用的使命:让 AI 服务更稳定、更可靠、更弹性")