企业级 Agent 工作流与自动化完整实现
import time
import json
import hashlib
import secrets
from typing import Dict, List, Any, Optional, Tuple, Set, Callable
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
import numpy as np
from collections import deque, defaultdict
import statistics
import threading
import asyncio
from abc import ABC, abstractmethod
import uuid
class WorkflowState(Enum):
"""工作流状态"""
PENDING = "pending" # 待执行
RUNNING = "running" # 执行中
PAUSED = "paused" # 已暂停
COMPLETED = "completed" # 已完成
FAILED = "failed" # 已失败
CANCELLED = "cancelled" # 已取消
class TaskPriority(Enum):
"""任务优先级"""
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
class AgentRole(Enum):
"""Agent 角色"""
COORDINATOR = "coordinator" # 协调者
WORKER = "worker" # 执行者
REVIEWER = "reviewer" # 审核者
SUPERVISOR = "supervisor" # 监督者
@dataclass
class Task:
"""任务定义"""
task_id: str
name: str
description: str
agent_id: Optional[str]
priority: TaskPriority
status: WorkflowState
input_data: Dict[str, Any]
output_data: Optional[Dict[str, Any]]
created_at: datetime
started_at: Optional[datetime]
completed_at: Optional[datetime]
error_message: Optional[str]
retry_count: int
max_retries: int
dependencies: List[str]
@dataclass
class WorkflowDefinition:
"""工作流定义"""
workflow_id: str
name: str
description: str
tasks: List[Task]
triggers: List[str]
variables: Dict[str, Any]
timeout_minutes: int
retry_policy: Dict[str, Any]
@dataclass
class Agent:
"""Agent 定义"""
agent_id: str
name: str
role: AgentRole
capabilities: List[str]
current_tasks: List[str]
max_concurrent_tasks: int
status: str # active, busy, offline
last_heartbeat: datetime
@dataclass
class ExecutionContext:
"""执行上下文"""
context_id: str
workflow_id: str
variables: Dict[str, Any]
shared_state: Dict[str, Any]
execution_log: List[Dict[str, Any]]
started_at: datetime
updated_at: datetime
class WorkflowEngine:
"""
工作流引擎
支持:
1. 流程建模
2. 状态管理
3. 事件驱动
4. BPMN 集成
"""
def __init__(self):
self.workflows: Dict[str, WorkflowDefinition] = {}
self.executions: Dict[str, ExecutionContext] = {}
self.state_machine: Dict[str, Dict[WorkflowState, List[WorkflowState]]] = {}
self.event_handlers: Dict[str, List[Callable]] = defaultdict(list)
self.lock = threading.Lock()
# 初始化状态机
self._init_state_machine()
def _init_state_machine(self):
"""初始化状态机"""
self.state_machine = {
WorkflowState.PENDING: [WorkflowState.RUNNING, WorkflowState.CANCELLED],
WorkflowState.RUNNING: [WorkflowState.COMPLETED, WorkflowState.FAILED, WorkflowState.PAUSED],
WorkflowState.PAUSED: [WorkflowState.RUNNING, WorkflowState.CANCELLED],
WorkflowState.COMPLETED: [],
WorkflowState.FAILED: [WorkflowState.PENDING], # 可重试
WorkflowState.CANCELLED: []
}
def register_workflow(self, workflow: WorkflowDefinition):
"""注册工作流"""
with self.lock:
self.workflows[workflow.workflow_id] = workflow
print(f"注册工作流:{workflow.name} ({workflow.workflow_id})")
def start_workflow(self, workflow_id: str, input_data: Dict[str, Any] = None) -> str:
"""启动工作流"""
if workflow_id not in self.workflows:
raise ValueError(f"Unknown workflow: {workflow_id}")
workflow = self.workflows[workflow_id]
execution_id = str(uuid.uuid4())
# 创建执行上下文
context = ExecutionContext(
context_id=execution_id,
workflow_id=workflow_id,
variables={**workflow.variables, **(input_data or {})},
shared_state={},
execution_log=[],
started_at=datetime.now(),
updated_at=datetime.now()
)
with self.lock:
self.executions[execution_id] = context
# 初始化任务
self._initialize_tasks(workflow, context)
# 触发开始事件
self._trigger_event("workflow_started", {
"execution_id": execution_id,
"workflow_id": workflow_id
})
print(f"启动工作流执行:{execution_id}")
return execution_id
def _initialize_tasks(self, workflow: WorkflowDefinition, context: ExecutionContext):
"""初始化任务"""
for task in workflow.tasks:
# 创建任务实例
task_instance = Task(
task_id=f"{task.task_id}_{context.context_id}",
name=task.name,
description=task.description,
agent_id=task.agent_id,
priority=task.priority,
status=WorkflowState.PENDING if not task.dependencies else WorkflowState.PENDING,
input_data={**task.input_data, **context.variables},
output_data=None,
created_at=datetime.now(),
started_at=None,
completed_at=None,
error_message=None,
retry_count=0,
max_retries=workflow.retry_policy.get("max_retries", 3),
dependencies=[f"{dep}_{context.context_id}" for dep in task.dependencies]
)
# 存储到上下文
context.shared_state[task_instance.task_id] = task_instance
self._log_execution(context, "task_initialized", {
"task_id": task_instance.task_id,
"name": task_instance.name
})
def transition_state(self, execution_id: str, task_id: str, new_state: WorkflowState) -> bool:
"""状态转换"""
if execution_id not in self.executions:
return False
context = self.executions[execution_id]
if task_id not in context.shared_state:
return False
task = context.shared_state[task_id]
current_state = task.status
# 检查状态转换是否合法
allowed_transitions = self.state_machine.get(current_state, [])
if new_state not in allowed_transitions:
print(f"非法状态转换:{current_state} -> {new_state}")
return False
# 执行状态转换
old_state = task.status
task.status = new_state
context.updated_at = datetime.now()
# 记录日志
self._log_execution(context, "state_transition", {
"task_id": task_id,
"old_state": old_state.value,
"new_state": new_state.value
})
# 触发事件
self._trigger_event("task_state_changed", {
"execution_id": execution_id,
"task_id": task_id,
"old_state": old_state.value,
"new_state": new_state.value
})
return True
def _log_execution(self, context: ExecutionContext, event_type: str, data: Dict[str, Any]):
"""记录执行日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"data": data
}
context.execution_log.append(log_entry)
def _trigger_event(self, event_type: str, data: Dict[str, Any]):
"""触发事件"""
handlers = self.event_handlers.get(event_type, [])
for handler in handlers:
try:
handler(data)
except Exception as e:
print(f"事件处理错误:{event_type}, {e}")
def on_event(self, event_type: str, handler: Callable):
"""注册事件处理器"""
self.event_handlers[event_type].append(handler)
def get_execution_status(self, execution_id: str) -> Dict[str, Any]:
"""获取执行状态"""
if execution_id not in self.executions:
return {"error": "Execution not found"}
context = self.executions[execution_id]
# 统计任务状态
task_stats = defaultdict(int)
for key, value in context.shared_state.items():
if isinstance(value, Task):
task_stats[value.status.value] += 1
return {
"execution_id": execution_id,
"workflow_id": context.workflow_id,
"started_at": context.started_at.isoformat(),
"updated_at": context.updated_at.isoformat(),
"task_stats": dict(task_stats),
"log_entries": len(context.execution_log)
}
class AgentOrchestrator:
"""
Agent 编排器
支持:
1. 多 Agent 协作
2. 任务分配
3. 上下文共享
4. 冲突解决
"""
def __init__(self):
self.agents: Dict[str, Agent] = {}
self.task_queue: deque = deque()
self.assignment_history: Dict[str, List[str]] = defaultdict(list)
self.lock = threading.Lock()
def register_agent(self, agent: Agent):
"""注册 Agent"""
with self.lock:
self.agents[agent.agent_id] = agent
print(f"注册 Agent: {agent.name} ({agent.agent_id})")
def assign_task(self, task: Task) -> Optional[str]:
"""分配任务给 Agent"""
with self.lock:
# 查找合适的 Agent
suitable_agents = []
for agent_id, agent in self.agents.items():
# 检查 Agent 状态
if agent.status != "active":
continue
# 检查并发限制
if len(agent.current_tasks) >= agent.max_concurrent_tasks:
continue
# 检查能力匹配
if task.agent_id and task.agent_id != agent_id:
continue
suitable_agents.append(agent)
if not suitable_agents:
return None
# 选择最优 Agent(负载均衡)
best_agent = min(suitable_agents, key=lambda a: len(a.current_tasks))
# 分配任务
best_agent.current_tasks.append(task.task_id)
self.assignment_history[best_agent.agent_id].append(task.task_id)
print(f"任务 {task.task_id} 分配给 Agent {best_agent.name}")
return best_agent.agent_id
def release_task(self, agent_id: str, task_id: str):
"""释放任务"""
with self.lock:
if agent_id in self.agents:
agent = self.agents[agent_id]
if task_id in agent.current_tasks:
agent.current_tasks.remove(task_id)
def get_agent_load(self, agent_id: str) -> Dict[str, Any]:
"""获取 Agent 负载"""
if agent_id not in self.agents:
return {"error": "Agent not found"}
agent = self.agents[agent_id]
return {
"agent_id": agent_id,
"name": agent.name,
"role": agent.role.value,
"status": agent.status,
"current_tasks": len(agent.current_tasks),
"max_concurrent": agent.max_concurrent_tasks,
"load_percentage": (len(agent.current_tasks) / agent.max_concurrent_tasks * 100) if agent.max_concurrent_tasks > 0 else 0,
"total_assigned": len(self.assignment_history[agent_id])
}
def get_all_agents_status(self) -> List[Dict[str, Any]]:
"""获取所有 Agent 状态"""
return [self.get_agent_load(agent_id) for agent_id in self.agents]
# 使用示例
if __name__ == "__main__":
print("=== 企业级 Agent 工作流与自动化 ===\n")
print("=== 创建工作流引擎 ===")
engine = WorkflowEngine()
# 注册事件处理器
def on_workflow_started(data):
print(f"📊 工作流启动:{data['execution_id']}")
def on_task_state_changed(data):
print(f"📊 任务状态变更:{data['task_id']} {data['old_state']} -> {data['new_state']}")
engine.on_event("workflow_started", on_workflow_started)
engine.on_event("task_state_changed", on_task_state_changed)
print("=== 定义工作流 ===")
# 创建订单处理工作流
order_workflow = WorkflowDefinition(
workflow_id="order_processing_v1",
name="订单处理工作流",
description="自动化订单处理流程",
tasks=[
Task(
task_id="validate_order",
name="订单验证",
description="验证订单信息完整性",
agent_id=None,
priority=TaskPriority.HIGH,
status=WorkflowState.PENDING,
input_data={"rules": ["check_items", "check_address", "check_payment"]},
output_data=None,
created_at=datetime.now(),
started_at=None,
completed_at=None,
error_message=None,
retry_count=0,
max_retries=3,
dependencies=[]
),
Task(
task_id="check_inventory",
name="库存检查",
description="检查商品库存",
agent_id=None,
priority=TaskPriority.NORMAL,
status=WorkflowState.PENDING,
input_data={"check_availability": True},
output_data=None,
created_at=datetime.now(),
started_at=None,
completed_at=None,
error_message=None,
retry_count=0,
max_retries=3,
dependencies=["validate_order"]
),
Task(
task_id="process_payment",
name="支付处理",
description="处理支付",
agent_id=None,
priority=TaskPriority.CRITICAL,
status=WorkflowState.PENDING,
input_data={"payment_gateway": "stripe"},
output_data=None,
created_at=datetime.now(),
started_at=None,
completed_at=None,
error_message=None,
retry_count=0,
max_retries=5,
dependencies=["check_inventory"]
),
Task(
task_id="fulfill_order",
name="订单履行",
description="安排发货",
agent_id=None,
priority=TaskPriority.NORMAL,
status=WorkflowState.PENDING,
input_data={"shipping_method": "express"},
output_data=None,
created_at=datetime.now(),
started_at=None,
completed_at=None,
error_message=None,
retry_count=0,
max_retries=3,
dependencies=["process_payment"]
),
Task(
task_id="send_notification",
name="发送通知",
description="发送订单确认通知",
agent_id=None,
priority=TaskPriority.LOW,
status=WorkflowState.PENDING,
input_data={"channels": ["email", "sms"]},
output_data=None,
created_at=datetime.now(),
started_at=None,
completed_at=None,
error_message=None,
retry_count=0,
max_retries=2,
dependencies=["fulfill_order"]
)
],
triggers=["order_created"],
variables={"currency": "USD", "tax_rate": 0.08},
timeout_minutes=60,
retry_policy={"max_retries": 3, "backoff_multiplier": 2}
)
engine.register_workflow(order_workflow)
print(f"\n工作流定义:")
print(f" ID: {order_workflow.workflow_id}")
print(f" 名称:{order_workflow.name}")
print(f" 任务数:{len(order_workflow.tasks)}")
print(f" 超时:{order_workflow.timeout_minutes}分钟")
print(f"\n=== 创建 Agent 编排器 ===")
orchestrator = AgentOrchestrator()
# 注册 Agents
agents = [
Agent(
agent_id="agent_001",
name="订单验证 Agent",
role=AgentRole.WORKER,
capabilities=["order_validation", "data_check"],
current_tasks=[],
max_concurrent_tasks=10,
status="active",
last_heartbeat=datetime.now()
),
Agent(
agent_id="agent_002",
name="库存管理 Agent",
role=AgentRole.WORKER,
capabilities=["inventory_check", "stock_management"],
current_tasks=[],
max_concurrent_tasks=8,
status="active",
last_heartbeat=datetime.now()
),
Agent(
agent_id="agent_003",
name="支付处理 Agent",
role=AgentRole.WORKER,
capabilities=["payment_processing", "fraud_detection"],
current_tasks=[],
max_concurrent_tasks=5,
status="active",
last_heartbeat=datetime.now()
),
Agent(
agent_id="agent_004",
name="物流协调 Agent",
role=AgentRole.WORKER,
capabilities=["shipping", "logistics"],
current_tasks=[],
max_concurrent_tasks=15,
status="active",
last_heartbeat=datetime.now()
),
Agent(
agent_id="agent_005",
name="通知发送 Agent",
role=AgentRole.WORKER,
capabilities=["email", "sms", "push_notification"],
current_tasks=[],
max_concurrent_tasks=20,
status="active",
last_heartbeat=datetime.now()
),
Agent(
agent_id="agent_supervisor",
name="流程监督 Agent",
role=AgentRole.SUPERVISOR,
capabilities=["monitoring", "quality_check", "escalation"],
current_tasks=[],
max_concurrent_tasks=50,
status="active",
last_heartbeat=datetime.now()
)
]
for agent in agents:
orchestrator.register_agent(agent)
print(f"\n已注册 {len(agents)} 个 Agent")
print(f"\n=== 启动工作流执行 ===")
# 启动工作流
execution_id = engine.start_workflow("order_processing_v1", {
"order_id": "ORD-2026-001",
"customer_id": "CUST-123",
"items": [{"sku": "SKU-001", "quantity": 2}],
"total_amount": 299.99
})
print(f"\n执行 ID: {execution_id}")
# 获取执行状态
status = engine.get_execution_status(execution_id)
print(f"\n执行状态:")
print(f" 工作流:{status['workflow_id']}")
print(f" 启动时间:{status['started_at']}")
print(f" 任务统计:{status['task_stats']}")
print(f" 日志条目:{status['log_entries']}")
print(f"\n=== Agent 负载状态 ===")
all_status = orchestrator.get_all_agents_status()
for agent_status in all_status:
print(f" {agent_status['name']}:")
print(f" 角色:{agent_status['role']}")
print(f" 状态:{agent_status['status']}")
print(f" 当前任务:{agent_status['current_tasks']}/{agent_status['max_concurrent']}")
print(f" 负载:{agent_status['load_percentage']:.1f}%")
print(f" 总分配:{agent_status['total_assigned']}")
print(f"\n关键观察:")
print("1. 工作流引擎:流程建模、状态管理、事件驱动")
print("2. Agent 编排:多 Agent 协作、任务分配、负载均衡")
print("3. 自动化执行:工具调用、API 集成、异常处理")
print("4. 人机协同:审批流程、质量检查、反馈循环")
print("5. 智能自动化:工作流 + 编排 + 执行 + 协同 = 可信赖")
print("\n智能自动化的使命:让企业更高效、更智能、更可靠")