优先级调度器完整实现
import heapq
import time
from typing import Dict, List, Optional, Callable, Any
from dataclasses import dataclass, field
from enum import Enum
from abc import ABC, abstractmethod
import threading
from concurrent.futures import ThreadPoolExecutor, Future
import random
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass(order=True)
class Task:
"""
任务定义
优先级调度核心:priority 越小优先级越高
"""
priority: int
task_id: str = field(compare=False)
name: str = field(compare=False)
execution_fn: Callable = field(compare=False, repr=False)
estimated_duration: float = field(compare=False, default=1.0)
resource_requirements: Dict[str, float] = field(compare=False, default_factory=dict)
dependencies: List[str] = field(compare=False, default_factory=list)
status: TaskStatus = field(compare=False, default=TaskStatus.PENDING)
created_at: float = field(compare=False, default_factory=time.time)
started_at: Optional[float] = field(compare=False, default=None)
completed_at: Optional[float] = field(compare=False, default=None)
result: Any = field(compare=False, default=None)
error: Optional[str] = field(compare=False, default=None)
def __post_init__(self):
if not self.resource_requirements:
# 默认资源需求
self.resource_requirements = {
'cpu': 1.0,
'memory': 512, # MB
'tokens': 1000
}
class PriorityCalculator(ABC):
"""优先级计算器基类"""
@abstractmethod
def calculate_priority(self, task: Task, system_state: Dict) -> int:
"""计算任务优先级"""
pass
class UrgencyPriorityCalculator(PriorityCalculator):
"""基于紧急程度的优先级计算器"""
def __init__(self, urgency_weights: Dict[str, int] = None):
self.urgency_weights = urgency_weights or {
'critical': 0, # 最高优先级
'high': 10,
'medium': 50,
'low': 100 # 最低优先级
}
def calculate_priority(self, task: Task, system_state: Dict) -> int:
# 从任务名或元数据提取紧急程度
urgency = 'medium'
if 'critical' in task.name.lower():
urgency = 'critical'
elif 'urgent' in task.name.lower() or 'high' in task.name.lower():
urgency = 'high'
elif 'low' in task.name.lower():
urgency = 'low'
base_priority = self.urgency_weights.get(urgency, 50)
# 考虑等待时间:等待越久,优先级提升(避免饿死)
wait_time = time.time() - task.created_at
aging_bonus = int(wait_time / 60) * 5 # 每等待 1 分钟,优先级提升 5
return max(0, base_priority - aging_bonus)
class ResourceAwarePriorityCalculator(PriorityCalculator):
"""资源感知优先级计算器"""
def calculate_priority(self, task: Task, system_state: Dict) -> int:
# 基础优先级
base_priority = 50
# 资源可用性调整:资源需求小的任务优先
available_cpu = system_state.get('available_cpu', 100)
required_cpu = task.resource_requirements.get('cpu', 1)
if required_cpu <= available_cpu * 0.1: # 需求<10% 可用资源
base_priority -= 20 # 小任务优先
elif required_cpu >= available_cpu * 0.5: # 需求>50%
base_priority += 30 # 大任务延后
# 考虑系统负载
system_load = system_state.get('load_average', 0.5)
if system_load > 0.8: # 高负载
base_priority += 10 # 降低优先级,避免雪崩
return max(0, base_priority)
class PriorityScheduler:
"""
优先级调度器
核心功能:
1. 任务提交与优先级计算
2. 依赖管理(DAG)
3. 资源分配与约束检查
4. 并发执行控制
5. 动态优先级调整
"""
def __init__(self,
max_workers: int = 4,
total_resources: Dict[str, float] = None,
priority_calculator: PriorityCalculator = None):
self.max_workers = max_workers
self.total_resources = total_resources or {
'cpu': 16.0,
'memory': 32768, # 32GB
'tokens': 1000000 # 1M tokens/min
}
self.available_resources = self.total_resources.copy()
self.priority_calculator = priority_calculator or UrgencyPriorityCalculator()
# 任务队列(最小堆)
self.task_queue: List[Task] = []
self.running_tasks: Dict[str, Task] = {}
self.completed_tasks: Dict[str, Task] = {}
self.pending_tasks: Dict[str, Task] = {}
# 锁
self.lock = threading.Lock()
self.resource_lock = threading.Lock()
# 执行器
self.executor = ThreadPoolExecutor(max_workers=max_workers)
# 系统状态
self.system_state = {
'available_cpu': self.total_resources['cpu'],
'load_average': 0.0,
'queue_size': 0
}
self.running = True
self.scheduler_thread = threading.Thread(target=self._scheduler_loop, daemon=True)
self.scheduler_thread.start()
def submit_task(self, task: Task) -> str:
"""提交任务"""
with self.lock:
# 计算优先级
priority = self.priority_calculator.calculate_priority(task, self.system_state)
task.priority = priority
# 检查依赖
unmet_deps = [dep_id for dep_id in task.dependencies
if dep_id not in self.completed_tasks]
if unmet_deps:
task.status = TaskStatus.PENDING
self.pending_tasks[task.task_id] = task
print(f"Task {task.task_id} pending, waiting for dependencies: {unmet_deps}")
else:
# 加入优先队列
heapq.heappush(self.task_queue, task)
print(f"Task {task.task_id} submitted with priority {priority}")
self.system_state['queue_size'] = len(self.task_queue) + len(self.pending_tasks)
return task.task_id
def _check_resources(self, task: Task) -> bool:
"""检查资源是否足够"""
with self.resource_lock:
for resource, required in task.resource_requirements.items():
available = self.available_resources.get(resource, 0)
if required > available:
return False
return True
def _allocate_resources(self, task: Task) -> bool:
"""分配资源"""
with self.resource_lock:
if not self._check_resources(task):
return False
for resource, required in task.resource_requirements.items():
self.available_resources[resource] -= required
return True
def _release_resources(self, task: Task):
"""释放资源"""
with self.resource_lock:
for resource, required in task.resource_requirements.items():
self.available_resources[resource] = min(
self.total_resources[resource],
self.available_resources[resource] + required
)
def _execute_task(self, task: Task) -> Any:
"""执行单个任务"""
task.status = TaskStatus.RUNNING
task.started_at = time.time()
try:
print(f"Executing task {task.task_id}: {task.name}")
result = task.execution_fn()
task.result = result
task.status = TaskStatus.COMPLETED
task.completed_at = time.time()
duration = task.completed_at - task.started_at
print(f"Task {task.task_id} completed in {duration:.2f}s")
return result
except Exception as e:
task.status = TaskStatus.FAILED
task.error = str(e)
task.completed_at = time.time()
print(f"Task {task.task_id} failed: {e}")
raise
finally:
self._release_resources(task)
def _scheduler_loop(self):
"""调度器主循环"""
while self.running:
tasks_to_run = []
with self.lock:
# 检查是否有待处理任务变为可执行
ready_tasks = []
for task_id, task in list(self.pending_tasks.items()):
unmet_deps = [dep_id for dep_id in task.dependencies
if dep_id not in self.completed_tasks]
if not unmet_deps:
ready_tasks.append(task)
del self.pending_tasks[task_id]
heapq.heappush(self.task_queue, task)
# 从优先队列取出可执行任务
temp_queue = []
while self.task_queue and len(tasks_to_run) < self.max_workers:
task = heapq.heappop(self.task_queue)
if self._check_resources(task):
if self._allocate_resources(task):
self.running_tasks[task.task_id] = task
tasks_to_run.append(task)
else:
temp_queue.append(task)
# 恢复未执行任务到队列
for task in temp_queue:
heapq.heappush(self.task_queue, task)
# 更新系统状态
self.system_state['available_cpu'] = self.available_resources['cpu']
self.system_state['load_average'] = len(self.running_tasks) / self.max_workers
self.system_state['queue_size'] = len(self.task_queue)
# 并发执行任务
for task in tasks_to_run:
future = self.executor.submit(self._execute_task, task)
future.add_done_callback(
lambda f, t=task: self._on_task_complete(t, f)
)
time.sleep(0.1) # 避免 CPU 空转
def _on_task_complete(self, task: Task, future: Future):
"""任务完成回调"""
with self.lock:
if task.task_id in self.running_tasks:
del self.running_tasks[task.task_id]
self.completed_tasks[task.task_id] = task
# 检查是否有依赖此任务的新任务可执行
for pending_task in list(self.pending_tasks.values()):
if task.task_id in pending_task.dependencies:
unmet_deps = [dep_id for dep_id in pending_task.dependencies
if dep_id not in self.completed_tasks]
if not unmet_deps:
del self.pending_tasks[pending_task.task_id]
heapq.heappush(self.task_queue, pending_task)
def get_task_status(self, task_id: str) -> Optional[Dict]:
"""获取任务状态"""
all_tasks = {**self.pending_tasks, **self.running_tasks, **self.completed_tasks}
task = all_tasks.get(task_id)
if not task:
return None
return {
'task_id': task.task_id,
'name': task.name,
'status': task.status.value,
'priority': task.priority,
'created_at': task.created_at,
'started_at': task.started_at,
'completed_at': task.completed_at,
'error': task.error
}
def shutdown(self, wait: bool = True):
"""关闭调度器"""
self.running = False
if wait:
self.scheduler_thread.join(timeout=5)
self.executor.shutdown(wait=wait)
# 使用示例
if __name__ == "__main__":
# 创建调度器
scheduler = PriorityScheduler(
max_workers=3,
total_resources={'cpu': 8.0, 'memory': 16384, 'tokens': 500000}
)
# 定义任务函数
def task_fn(duration: float, task_name: str):
time.sleep(duration)
return f"{task_name} completed"
# 创建任务
tasks = [
Task(
priority=50, # 初始优先级(会被重新计算)
task_id=f"task_{i}",
name=f"Critical Task {i}" if i == 0 else f"Normal Task {i}",
execution_fn=lambda d=0.5+i*0.2, n=f"Task {i}": task_fn(d, n),
estimated_duration=0.5 + i * 0.2,
resource_requirements={'cpu': 1.0, 'memory': 1024, 'tokens': 50000}
)
for i in range(6)
]
# 添加依赖:task_3 依赖 task_0 和 task_1
tasks[3].dependencies = ['task_0', 'task_1']
# 提交任务
print("=== 提交任务 ===")
for task in tasks:
scheduler.submit_task(task)
# 等待执行
print("\n=== 等待任务执行 ===")
time.sleep(5)
# 查询状态
print("\n=== 任务状态 ===")
for i in range(6):
status = scheduler.get_task_status(f"task_{i}")
if status:
print(f"Task {i}: {status['status']} (priority: {status['priority']})")
# 关闭
scheduler.shutdown()
print("\n关键观察:")
print("1. 优先级调度:紧急任务(Critical)优先执行")
print("2. 依赖管理:task_3 等待 task_0 和 task_1 完成后才执行")
print("3. 资源约束:CPU/内存/Token 限制,避免资源耗尽")
print("4. 并发控制:max_workers 限制最大并发数")
print("5. 动态调整:等待时间越长,优先级提升(避免饿死)")