异步执行完整实现
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
import random
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
TIMEOUT = "timeout"
@dataclass
class AsyncTask:
"""异步任务定义"""
task_id: str
name: str
coroutine: Any
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: str = None
start_time: float = None
end_time: float = None
timeout: float = None
class AsyncExecutor:
"""
异步执行器
核心功能:
1. 任务提交与调度
2. 超时控制
3. 并发限制
4. 错误处理
5. 结果收集
"""
def __init__(self, max_concurrency: int = 10):
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
self.tasks: Dict[str, AsyncTask] = {}
self.results: Dict[str, Any] = {}
async def execute_with_timeout(
self,
task_id: str,
coro: Any,
timeout: float = 30.0
) -> Any:
"""
带超时控制的异步执行
Args:
task_id: 任务 ID
coro: 协程对象
timeout: 超时时间(秒)
Returns:
执行结果
Raises:
asyncio.TimeoutError: 超时异常
"""
try:
result = await asyncio.wait_for(coro, timeout=timeout)
return result
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"Task {task_id} timed out after {timeout}s")
async def execute_with_retry(
self,
task_id: str,
coro: Any,
max_retries: int = 3,
backoff_factor: float = 2.0,
timeout: float = 30.0
) -> Any:
"""
带重试机制的异步执行
Args:
task_id: 任务 ID
coro: 协程对象
max_retries: 最大重试次数
backoff_factor: 退避因子(指数退避)
timeout: 超时时间
Returns:
执行结果
"""
last_error = None
for attempt in range(max_retries + 1):
try:
result = await self.execute_with_timeout(task_id, coro, timeout)
# 记录成功
if task_id in self.tasks:
self.tasks[task_id].status = TaskStatus.COMPLETED
self.tasks[task_id].result = result
return result
except asyncio.TimeoutError as e:
last_error = e
print(f"Task {task_id} attempt {attempt + 1} timed out")
except Exception as e:
last_error = e
print(f"Task {task_id} attempt {attempt + 1} failed: {e}")
# 指数退避
if attempt < max_retries:
wait_time = backoff_factor ** attempt
print(f"Retrying {task_id} in {wait_time}s...")
await asyncio.sleep(wait_time)
# 所有重试失败
if task_id in self.tasks:
self.tasks[task_id].status = TaskStatus.FAILED
self.tasks[task_id].error = str(last_error)
raise last_error
async def execute_concurrent(
self,
tasks: List[AsyncTask]
) -> Dict[str, Any]:
"""
并发执行多个任务
Args:
tasks: 任务列表
Returns:
结果字典 {task_id: result}
"""
async def run_task(task: AsyncTask):
async with self.semaphore: # 限制并发数
task.status = TaskStatus.RUNNING
task.start_time = time.time()
try:
result = await self.execute_with_retry(
task.task_id,
task.coroutine,
timeout=task.timeout or 30.0
)
task.end_time = time.time()
self.results[task.task_id] = result
return task.task_id, result
except Exception as e:
task.end_time = time.time()
task.status = TaskStatus.FAILED
task.error = str(e)
return task.task_id, None
# 并发执行所有任务
coroutines = [run_task(task) for task in tasks]
results = await asyncio.gather(*coroutines, return_exceptions=True)
return dict(results)
def submit_task(
self,
task_id: str,
name: str,
coro: Any,
timeout: float = 30.0
) -> AsyncTask:
"""提交任务"""
task = AsyncTask(
task_id=task_id,
name=name,
coroutine=coro,
timeout=timeout
)
self.tasks[task_id] = task
return task
def get_task_status(self, task_id: str) -> Dict:
"""获取任务状态"""
task = self.tasks.get(task_id)
if not task:
return None
return {
'task_id': task.task_id,
'name': task.name,
'status': task.status.value,
'result': task.result,
'error': task.error,
'duration': (task.end_time - task.start_time) if task.end_time and task.start_time else None
}
# 使用示例
async def fetch_url(session: aiohttp.ClientSession, url: str, delay: float = 0):
"""模拟 HTTP 请求"""
await asyncio.sleep(delay) # 模拟网络延迟
# 模拟随机失败
if random.random() < 0.2:
raise Exception(f"Failed to fetch {url}")
return f"Data from {url}"
async def main():
# 创建执行器
executor = AsyncExecutor(max_concurrency=5)
# 创建 HTTP 会话
async with aiohttp.ClientSession() as session:
# 提交任务
urls = [
("https://api.example.com/data1", 0.5),
("https://api.example.com/data2", 1.2),
("https://api.example.com/data3", 0.8),
("https://api.example.com/data4", 2.0),
("https://api.example.com/data5", 0.3),
]
tasks = []
for i, (url, delay) in enumerate(urls):
task = executor.submit_task(
task_id=f"task_{i}",
name=f"Fetch {url}",
coro=fetch_url(session, url, delay),
timeout=5.0
)
tasks.append(task)
print("=== 开始并发执行任务 ===")
start_time = time.time()
# 并发执行
results = await executor.execute_concurrent(tasks)
end_time = time.time()
print(f"\n=== 执行完成,总耗时:{end_time - start_time:.2f}s ===")
# 查看结果
print("\n=== 任务结果 ===")
for task_id, result in results.items():
status = executor.get_task_status(task_id)
print(f"{task_id}: {status['status']} - {status['duration']:.2f}s")
if result:
print(f" 结果:{result[:50]}...")
elif status['error']:
print(f" 错误:{status['error']}")
print("\n关键观察:")
print("1. 并发执行:5 个任务并发,总耗时≈最慢任务时间(2.0s),而非累加(4.8s)")
print("2. 并发限制:Semaphore 限制最大并发数为 5,避免资源耗尽")
print("3. 超时控制:每个任务超时 5s,防止慢任务拖垮系统")
print("4. 重试机制:失败任务自动重试(指数退避)")
print("5. 错误隔离:单个任务失败不影响其他任务")
if __name__ == "__main__":
asyncio.run(main())