构建低延迟 AI 管道:极简 DAG 智能工作流编排引擎的轻量级实现方案
构建低延迟 AI 管道:极简 DAG 智能工作流编排引擎的轻量级实现方案
一、AI 工作流中的同步阻塞问题
当大模型(LLM)从简单问答转向复杂企业级业务流时,多代理协作和自动化管道成为主流方案。但很多团队在搭建这类系统时,代码结构容易变得混乱。不同业务节点(如用户输入提取、向量检索、LLM 判定、邮件发送等)的依赖关系错综复杂,如果用传统的同步阻塞模式,系统处理多任务时会变得很慢。
举个例子,两个本可以并行运行的 LLM 评估节点,如果写成串行执行,整个工作流的运行时间就会翻倍。同时,缺乏统一的任务编排,错误捕获和重试机制也容易出错。如何在不用重型分布式框架(如 Airflow 或 Camunda)的前提下,用几十行代码构建一个支持并行度最大化、且防死锁的轻量级有向无环图(DAG)调度引擎,是提升全栈 AI 应用交付质量的关键。
二、轻量级拓扑调度与多代理协作架构
为了减少串行等待带来的延迟,我们可以把整个业务流抽象为一张 DAG 图。通过静态分析节点依赖性,计算拓扑顺序,然后利用异步事件循环让没有直接依赖的节点并发执行。
下图展示了 DAG 工作流引擎的并行计算与数据收集流程:
graph LR
A[工作流开始] --> B[节点 1: 数据清洗过滤]
B --> C[节点 2: LLM 情绪分类]
B --> D[节点 3: 向量知识库检索]
C --> E[节点 4: 智能回复拼接]
D --> E
E --> F[工作流完成]
style C fill:#bbf,stroke:#333,stroke-width:2px
style D fill:#bbf,stroke:#333,stroke-width:2px
style E fill:#afa,stroke:#333,stroke-width:2px
节点 2 和节点 3 相互独立,调度器会同时启动它们,总耗时取决于两者中较慢的那个,而不是相加。
三、Node.js 异步流式任务节点与拓扑排序编排引擎
以下是一个用 JavaScript (Node.js) 实现的 DAG 编排引擎,具备自动入度计算、环路检测和异步流式调度功能:
class TaskNode {
constructor(name, runAction) {
this.name = name;
this.runAction = runAction; // 节点的具体异步处理任务
this.dependencies = []; // 前置依赖节点名称
this.inDegree = 0; // 入度值,表示还有多少个前置依赖未完成
this.status = 'IDLE'; // IDLE, PENDING, RUNNING, FINISHED, ERROR
this.output = null;
}
dependsOn(depNodeName) {
this.dependencies.push(depNodeName);
}
}
class MicroDagScheduler {
constructor() {
this.nodes = new Map();
}
addNode(node) {
this.nodes.set(node.name, node);
}
// 拓扑结构合法性校验及入度初始化
validateAndInit() {
for (const [name, node] of this.nodes) {
node.inDegree = 0; // 重置
}
// 计算每个节点的真实入度
for (const [name, node] of this.nodes) {
for (const depName of node.dependencies) {
if (!this.nodes.has(depName)) {
throw new Error(`节点 [${name}] 依赖的 [${depName}] 未注册!`);
}
node.inDegree++;
}
}
// 简易 Kahn 算法检测是否存在循环依赖(环)
const tempInDegree = new Map();
const queue = [];
for (const [name, node] of this.nodes) {
tempInDegree.set(name, node.inDegree);
if (node.inDegree === 0) {
queue.push(name);
}
}
let visitedCount = 0;
while (queue.length > 0) {
const curr = queue.shift();
visitedCount++;
// 查找所有以 curr 为前置依赖的节点
for (const [name, node] of this.nodes) {
if (node.dependencies.includes(curr)) {
tempInDegree.set(name, tempInDegree.get(name) - 1);
if (tempInDegree.get(name) === 0) {
queue.push(name);
}
}
}
}
if (visitedCount !== this.nodes.size) {
throw new Error("拓扑验证失败: 工作流依赖图存在循环引用(环路)!");
}
console.log("拓扑验证成功:依赖图无环,初始化就绪。");
}
// 启动流式异步调度
async run(initialContext) {
this.validateAndInit();
const activePromises = new Map();
while (true) {
let activeTaskLaunched = false;
let unfinishedTasks = false;
for (const [name, node] of this.nodes) {
if (node.status === 'FINISHED' || node.status === 'ERROR') continue;
unfinishedTasks = true;
if (node.status === 'RUNNING') continue;
// 入度为 0 意味着前置依赖已全部跑完,可以立即启动
if (node.inDegree === 0) {
node.status = 'RUNNING';
activeTaskLaunched = true;
const promise = (async () => {
try {
// 归纳前置依赖的输出结果作为当前节点的输入
const inputs = {};
node.dependencies.forEach(depName => {
inputs[depName] = this.nodes.get(depName).output;
});
const result = await node.runAction(initialContext, inputs);
node.output = result;
node.status = 'FINISHED';
// 通知所有依赖此节点的下游,将其入度扣减 1
for (const [_, subNode] of this.nodes) {
if (subNode.dependencies.includes(name)) {
subNode.inDegree--;
}
}
} catch (err) {
node.status = 'ERROR';
throw err;
}
})();
activePromises.set(name, promise);
}
}
if (!unfinishedTasks) break;
if (!activeTaskLaunched && activePromises.size === 0) {
throw new Error("工作流卡死,未有活动任务推进。");
}
// 等待最先完成的异步节点以推进调度循环
await Promise.race(activePromises.values());
// 清理已完成的任务 promise
for (const [name, promise] of activePromises) {
const node = this.nodes.get(name);
if (node.status === 'FINISHED' || node.status === 'ERROR') {
activePromises.delete(name);
}
}
}
const outputs = {};
for (const [name, node] of this.nodes) {
outputs[name] = node.output;
}
return outputs;
}
}
// 快速运行测试
(async () => {
const scheduler = new MicroDagScheduler();
const nodeA = new TaskNode("CleanInput", async (ctx) => {
return ctx.text.trim();
});
const nodeB = new TaskNode("LlmClassify", async (ctx, inputs) => {
const text = inputs.CleanInput;
await new Promise(resolve => setTimeout(resolve, 300)); // 模拟 LLM 耗时
return text.includes("退款") ? "REFUND_REQUEST" : "GENERAL_TICKET";
});
nodeB.dependsOn("CleanInput");
const nodeC = new TaskNode("DbSearch", async (ctx, inputs) => {
// 模拟并行进行的向量数据库知识检索
await new Promise(resolve => setTimeout(resolve, 200));
return "本地标准退款政策:待发货订单支持极速自动退款。";
});
nodeC.dependsOn("CleanInput");
const nodeD = new TaskNode("FinalReply", async (ctx, inputs) => {
const cat = inputs.LlmClassify;
const kb = inputs.DbSearch;
return `分类: ${cat} | 匹配到的参考条款: [${kb}]`;
});
nodeD.dependsOn("LlmClassify");
nodeD.dependsOn("DbSearch");
scheduler.addNode(nodeA);
scheduler.addNode(nodeB);
scheduler.addNode(nodeC);
scheduler.addNode(nodeD);
const startTime = Date.now();
const result = await scheduler.run({ text: " 我想修改订单退款可以吗 " });
console.log(`工作流执行成功,耗时: ${Date.now() - startTime}ms`);
console.log("最终执行数据报表:", result);
})();
四、分布式执行的边界:状态一致性、幂等防刷与断点重试的工程折中
在将单机 DAG 编排引擎扩展到云端分布式环境时,需要在几个关键点之间权衡:
状态管理与一致性:基于内存的调度非常轻量、零延迟,但应用实例崩溃后,所有运行中的长工作流都会丢失。引入外部分布式数据库或 Redis 状态锁虽然能实现崩溃恢复,但每次状态转移都要进行网络序列化写入,会导致整个 AI 应用的响应耗时明显上升。
幂等性与成本:当工作流中的某个非大模型节点(如向客户发送通知短信)执行超时时,引擎会触发重试。如果上游的大模型生成动作没有做好幂等主键设计(Idempotency Key),重复的重试会反复调用昂贵的大模型 API,导致平台开销失控。在设计大模型节点时,必须根据工单 ID 建立请求缓存防线。
微服务化与单体架构:把每个任务节点拆分成独立的 Serverless 容器,虽然能实现精细的按需扩缩容,但这也成倍增加了不同云函数之间数据传递和网络路由的复杂度。初创阶段应坚决保持单体单容器内的异步任务池编排,等到某一节点真正出现性能瓶颈时再行剥离。
五、总结
智能工作流的工程优化,本质上是在解耦复杂业务逻辑的同时,充分利用服务器的并发计算能力。通过构建极简的无环图 Kahn 校验机制与异步流式调度引擎,我们能够在不依赖沉重外部中间件的极简配置下,让多个 AI 节点和辅助数据库操作并发运行,以最低的基础设施成本为用户提供流畅的智能交互体验。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)