工作流执行引擎实现
class WorkflowEngine {
constructor(options = {}) {
this.componentRegistry = new ComponentRegistry();
this.stateManager = new StateManager(options.storage);
this.eventBus = new EventBus();
this.scheduler = new Scheduler();
}
// 执行工作流
async execute(workflowId, input = {}) {
// 1. 加载工作流定义
const workflow = await this.loadWorkflow(workflowId);
// 2. 创建执行上下文
const context = new ExecutionContext({
workflow,
input,
traceId: this.generateTraceId()
});
// 3. 初始化状态
await this.stateManager.initialize(workflowId, context.traceId, {
status: 'running',
startTime: Date.now(),
nodes: workflow.nodes.map(node => ({
id: node.id,
status: 'pending'
}))
});
// 4. 查找起始节点
const startNodes = workflow.nodes.filter(
node => node.type.startsWith('trigger.')
);
// 5. 执行起始节点
const promises = startNodes.map(node =>
this.executeNode(node, context)
);
// 6. 等待所有分支完成
await Promise.all(promises);
// 7. 更新工作流状态
await this.stateManager.update(workflowId, context.traceId, {
status: 'completed',
endTime: Date.now()
});
// 8. 发出完成事件
this.eventBus.emit('workflow.completed', {
workflowId,
traceId: context.traceId
});
return context.output;
}
// 执行单个节点
async executeNode(node, context) {
const { workflow } = context;
try {
// 1. 更新节点状态为运行中
await this.stateManager.updateNodeStatus(
workflow.id,
context.traceId,
node.id,
'running',
{ startTime: Date.now() }
);
// 2. 获取组件
const component = this.componentRegistry.get(node.type);
if (!component) {
throw new Error(`Component not found: ${node.type}`);
}
// 3. 准备输入数据
const inputData = await this.prepareNodeInput(node, context);
// 4. 执行组件
const output = await component.execute(
node.data.config,
inputData,
context
);
// 5. 存储输出数据
context.setData(`nodes.${node.id}.output`, output);
// 6. 更新节点状态为完成
await this.stateManager.updateNodeStatus(
workflow.id,
context.traceId,
node.id,
'completed',
{
endTime: Date.now(),
output
}
);
// 7. 查找并执行后续节点
const nextNodes = this.getNextNodes(workflow, node, output);
if (nextNodes.length > 0) {
const promises = nextNodes.map(nextNode =>
this.executeNode(nextNode, context)
);
await Promise.all(promises);
}
} catch (error) {
// 异常处理
await this.handleNodeError(node, context, error);
}
}
// 准备节点输入
async prepareNodeInput(node, context) {
const inputData = {};
// 从上游节点收集输出
const incomingEdges = workflow.edges.filter(
edge => edge.target === node.id
);
for (const edge of incomingEdges) {
const sourceOutput = context.getData(
`nodes.${edge.source}.output`
);
// 数据映射
if (edge.data?.mapping) {
Object.assign(inputData,
this.applyMapping(sourceOutput, edge.data.mapping)
);
} else {
Object.assign(inputData, sourceOutput);
}
}
// 添加全局变量
Object.assign(inputData, {
variables: context.variables,
workflow: {
id: workflow.id,
name: workflow.name
}
});
return inputData;
}
// 获取后续节点
getNextNodes(workflow, currentNode, output) {
const outgoingEdges = workflow.edges.filter(
edge => edge.source === currentNode.id
);
const nextNodes = [];
for (const edge of outgoingEdges) {
let shouldExecute = true;
// 条件分支处理
if (currentNode.type === 'logic.condition') {
const nextPort = output.nextPort;
shouldExecute = (
(nextPort === 'true' && edge.sourceHandle === 'true') ||
(nextPort === 'false' && edge.sourceHandle === 'false')
);
}
if (shouldExecute) {
const targetNode = workflow.nodes.find(
n => n.id === edge.target
);
if (targetNode) {
nextNodes.push(targetNode);
}
}
}
return nextNodes;
}
// 节点异常处理
async handleNodeError(node, context, error) {
console.error(`Node ${node.id} error:`, error);
// 更新节点状态为失败
await this.stateManager.updateNodeStatus(
context.workflow.id,
context.traceId,
node.id,
'failed',
{
endTime: Date.now(),
error: {
message: error.message,
stack: error.stack
}
}
);
// 发出错误事件
this.eventBus.emit('node.failed', {
workflowId: context.workflow.id,
traceId: context.traceId,
nodeId: node.id,
error
});
// 根据错误策略处理
const errorPolicy = node.data.config?.errorPolicy || 'stop';
if (errorPolicy === 'retry') {
// 重试逻辑
const maxRetries = node.data.config?.maxRetries || 3;
if (context.getRetryCount(node.id) < maxRetries) {
context.incrementRetryCount(node.id);
await this.executeNode(node, context);
return;
}
}
if (errorPolicy === 'continue') {
// 继续执行后续节点
const nextNodes = this.getNextNodes(
context.workflow,
node,
{ error: true }
);
const promises = nextNodes.map(nextNode =>
this.executeNode(nextNode, context)
);
await Promise.all(promises);
return;
}
// 默认策略:停止工作流
await this.stateManager.update(
context.workflow.id,
context.traceId,
{ status: 'failed' }
);
throw error;
}
}
export default WorkflowEngine;