《AgentX 专栏》08-工作流引擎:AgentWorkflow怎么把工具记忆流程串成一条流水线
工作流引擎:AgentWorkflow 怎么把工具、记忆、流程串成一条流水线|AgentX 专栏⑧
本文是 AgentX 技术专栏第八篇。基于真实项目源码(
AgentWorkflow/WorkflowExecutor/LangGraphOrchestrator/GeneralService),从技术简介到设计思路,从核心代码到生产踩坑,循序渐进拆解 AgentX 的双档工作流引擎——让 Java 程序员一次性看懂 Agent 框架最关键的那个"调度大脑"。
本文速览:
- 没有工作流引擎的 Agent,本质上就是一坨 if-else——为什么?
- Agent 工作流引擎是什么?对标 LangGraph / AutoGen / CrewAI 的定位
- AgentX 的四条核心设计原则:双档分流、状态机编排、协作式取消、上下文显式传递
- simple 模式:
AiServices+ 实例缓存 + 双重检查锁 - workflow 模式:LangGraph 三节点循环 + 虚拟线程并行调工具
- 流式 SSE 三件套:onPartialResponse / onToolExecuted / onComplete + 心跳保活
- 三个让大模型项目翻车的实战大坑,以及对应的修法
文章很长(约 38 KB),但每一节都有承上启下,建议从头读——只看代码片段可能 get 不到设计意图。读完文末有完整代码包获取方式。
一、先抛个问题:没有工作流引擎的 Agent 长什么样?
很多 Java 程序员第一次写 Agent,代码大概长这样:
@PostMapping("/chat")
public String chat(String userInput) {
// 第一步:让 AI 思考
AiMessage response = chatModel.chat(buildMessages(userInput));
// 第二步:如果 AI 想调工具,那就调
if (response.hasToolExecutionRequests()) {
for (var req : response.toolExecutionRequests()) {
Object result = toolRegistry.execute(req.name(), req.arguments());
// 把工具结果塞回去再让 AI 思考一遍
messages.add(ToolExecutionResultMessage.from(req, result));
}
// 再问一遍
response = chatModel.chat(messages);
// 如果它还想调工具呢?
if (response.hasToolExecutionRequests()) {
// ……嵌套 if,写到怀疑人生
}
}
return response.text();
}
这段代码上线一周就会变成噩梦:
| 需求 | 改动代价 |
|---|---|
| 想加 RAG 检索 | 在 chat 前面再 if 一段 |
| 想支持 SSE 流式 | 重写一份流式版本,代码翻倍 |
| 用户想中途取消 | 加 cancelToken,每层 if 都要检查 |
| 想统计每个工具的调用耗时 | 在每个 execute 前后加 log/Span,散落各处 |
| 多工具想并发执行 | 把 for 循环改成 CompletableFuture,又得测一遍 |
| 不同业务想用不同 Prompt 策略 | 复制粘贴一份新的 Controller |
所有这些"动一发牵全身"的痛点,源头只有一个:流程逻辑写死在过程式代码里。
工业界对此的标准答案是——用工作流引擎把"流程"和"逻辑"解耦。LangChain 有 LangGraph,微软有 AutoGen,AgentX 这个 Java 项目也得有自己的工作流引擎。这篇文章就来拆解它怎么设计、怎么落地、怎么避坑。
二、技术简介:Agent 工作流引擎是什么
2.1 一句话定义
Agent 工作流引擎是把 "LLM 思考 → 调工具 → 再思考"的循环抽象成可配置的状态机,对外提供统一调度入口的运行时组件。
它至少要解决五件事:
| 职责 | 解决什么问题 |
|---|---|
| 节点编排 | 把"思考"“检索”"工具调用"等步骤抽象成节点,让流程可读、可改 |
| 状态流转 | 维护对话/工作流的执行状态,支持中断、恢复、回溯 |
| 并发调度 | 多个工具能并行执行而不是串行 join |
| 可观测埋点 | 每个节点的耗时、入参、结果都能落到 Trace/Log |
| 流式输出 | LLM 生成的 token 能实时推给前端,不能等 1 分钟才返回 |
2.2 业界横向对比
| 框架 | 语言 | 编排范式 | 适合场景 |
|---|---|---|---|
| LangGraph | Python | DAG 图编排(节点+边) | Python 生态,复杂多 Agent 协作 |
| AutoGen | Python | 多 Agent 对话式编排 | 多 Agent 互相讨论的研究场景 |
| CrewAI | Python | 角色+任务的层级化分工 | 模拟团队协作(产品+开发+测试) |
| Spring AI Advisors | Java | 拦截器链式增强 | Spring Boot 项目轻量增强 |
| AgentX AgentWorkflow(本文) | Java | 双档:AiService 直驱 + LangGraph 状态机 | 企业级 Java 项目,单 Agent 多工具 |
AgentX 没去模仿 LangGraph 的全图编排(那对 Java 生态来说太重),而是只抽象到企业项目真正需要的程度——两档够用就停手。这是后面要展开的"设计原则一"。
2.3 AgentWorkflow 在 AgentX 里的位置
┌──────────────────────────────────────────────────────────┐
│ ① 接入层 AgentController (REST/SSE 入口) │
├──────────────────────────────────────────────────────────┤
│ ② 调度层 AgentService (超时熔断 + TraceId 捕获) │
├──────────────────────────────────────────────────────────┤
│ ③ 工作流层 AgentWorkflow ← 本文主角 │
│ ├─ simple → AiServices 直驱 │
│ └─ workflow → WorkflowExecutor → LangGraphOrch. │
├──────────────────────────────────────────────────────────┤
│ ④ 工具层 ToolRegistry · ChatMemoryProvider │
│ ⑤ 模型层 ChatModel (Ollama) · StreamingChatModel │
│ ⑥ 存储层 Milvus (RAG) · Redis (Memory) │
└──────────────────────────────────────────────────────────┘
AgentWorkflow 处于第③层,向上接住 AgentService 的调度请求,向下调用工具/模型/记忆——它是整个 Agent 系统的"调度大脑"。
三、设计思路:四条让代码不烂的核心原则
技术简介看完,开始讲"为什么这么设计"。AgentX 的工作流引擎有四条核心原则,缺一不可。
原则一:双档分流,避免过度设计
不是所有请求都需要走"图编排+多节点+并行工具"。日常对话场景下:
| 请求类型 | 占比 | 真实需求 |
|---|---|---|
| 闲聊"你好" | ~30% | 单次 LLM 调用就够 |
| 简单问答"今天天气" | ~50% | 单次 LLM + 单工具调用 |
| 复杂任务"对比 5 家公司财报" | ~20% | RAG + 多工具 + 多轮推理 |
80% 的请求用 5% 的代码就能 cover——为它们配一套完整图编排是过度设计。
所以 AgentX 把工作流引擎分成两档:
// simple 模式:闲聊/单工具 → AiServices 一行搞定
public String executeSimpleWorkflow(String convId, String userInput) {
GeneralService agent = getOrCreateAgent();
Result<String> result = agent.chat(convId, userInput);
return processResult(result);
}
// workflow 模式:复杂任务 → LangGraph 多节点循环
public String executeComplexWorkflow(String convId, String userInput) {
List<ToolSpecification> toolSpecs = getOptimizedToolSpecs();
return workflowExecutor.execute(convId, userInput, toolSpecs);
}
调用方根据 AgentRequest.agentType() 路由——简单的走 simple,复杂的走 workflow。这种"奥卡姆剃刀式"的分流是企业项目区别于 demo 项目的关键标志。
原则二:状态机编排,让流程可观测
workflow 模式下,AgentX 借鉴 LangGraph 的"节点 + 状态 + 路由"模型,把 Agent 执行抽象成三节点循环:
┌─────────────────────────────────────────┐
↓ │
┌─────────────────┐ ┌────────────┐ ┌────────────────┐
│ retrieval_node │ → │ agent_node │ → │ action_node │
│ RAG 检索 │ │ LLM 思考 │ │ 并行调工具 │
│ 缝合 Prompt │ │ 路由决策 │ │ 汇总结果 │
└─────────────────┘ └─────┬──────┘ └────────────────┘
│
是否还需要工具?
│
yes ←─────┴─────→ no
│
▼
END
每个节点都是独立函数 + 独立 Observation Span,意味着:
- Jaeger 里能看到每个节点的精确耗时
- 加新节点不影响旧节点
- 节点出错可以单独捕获和降级
safetyCounter < 12给整个循环上保险,防止 LLM 无限要工具
这种设计的代价是多写 100 行 orchestration 代码,但换来的可读性和可观测性远远值得。
原则三:协作式取消,比强制中断更安全
Agent 任务跑分钟级很常见,用户可能:
- 关浏览器标签(前端 SSE 断连)
- 触发超时(前端等不及)
- 主动点"停止"按钮
强制中断(Thread.interrupt())会把任务卡在某个奇怪状态:HTTP 连接半开、Milvus 查询挂起、ChatMemory 写一半。
AgentX 选择协作式取消:
// 1. 所有 SSE 异常映射到统一取消标志
emitter.onCompletion(() -> cancellationRequests.add(workflowId));
emitter.onTimeout(() -> cancellationRequests.add(workflowId));
emitter.onError(e -> cancellationRequests.add(workflowId));
// 2. 节点循环每轮检查
while (!"END".equals(nextNode) && safetyCounter < 12) {
if (cancellationRequests.contains(state.getSessionId())) {
// 走清理逻辑,不强中断
return Map.of("output", "任务已被取消。", "status", "CANCELLED");
}
// 跑当前节点...
}
代价:当前节点必须跑完才能取消(最多多等几百毫秒)。换来:资源永远干净,状态可预期。
原则四:跨线程上下文显式传递
LangChain4j 的流式回调(onPartialResponse / onToolExecuted)执行在 内部 ForkJoinPool 上,不是原始 HTTP 虚拟线程。这意味着 ThreadLocal 在回调里必然失效:
// 在 onToolExecuted 回调里:
AgentContext ctx = AgentThreadContext.get(); // 💀 null
Span span = tracer.currentSpan(); // 💀 null
AgentX 的解法是预捕获模式——在虚拟线程退出前把对象引用抓出来,闭包传给回调:
// HTTP 虚拟线程上预捕获
AgentContext capturedCtx = AgentThreadContext.get();
Span capturedHttpSpan = tracer.currentSpan();
// ForkJoinPool 回调中通过闭包变量使用
streamingAgent.chatStream(convId, userInput)
.onToolExecuted(toolExecution -> {
if (capturedCtx != null) capturedCtx.recordToolInvocation(...);
if (capturedHttpSpan != null) capturedHttpSpan.event(...);
});
这条原则后面会反复出现——任何跨线程的上下文,都必须显式捕获引用,不能依赖 ThreadLocal。
四条原则讲完,下面看具体怎么落地。
四、代码解析:从入口到节点的完整链路
4.1 入口分发:AgentWorkflow
AgentWorkflow 是整个工作流的入口,对应"执行总监"的角色——AgentService 接到请求后,由总监决定走哪条流水线:
@Slf4j
@Component
@RequiredArgsConstructor
public class AgentWorkflow {
private final ChatModel chatModel; // 同步大脑
private final StreamingChatModel streamingChatModel; // 流式大脑
private final WorkflowExecutor workflowExecutor; // 复杂流水线
private final ToolRegistry toolRegistry;
private final ChatMemoryProvider chatMemoryProvider;
private final Tracer tracer;
// 💡 关键:AiService 实例可跨请求复用,缓存起来
private volatile GeneralService cachedAgent;
private volatile GeneralService cachedStreamingAgent;
// ── simple 模式(AiService 直驱)─────────────────────
public String executeSimpleWorkflow(String convId, String userInput) { ... }
public void executeSimpleWorkflowStream(...) { ... }
// ── workflow 模式(LangGraph 图编排)─────────────────
public String executeComplexWorkflow(String convId, String userInput) { ... }
public void executeComplexWorkflowStream(...) { ... }
}
两个 volatile cachedAgent 字段是关键性能优化点。
LangChain4j 的 AiServices.builder().build() 实际上是用 JDK 动态代理生成 Proxy 对象,构建过程涉及反射、类扫描、ToolSpecification 提取等开销。如果每次请求都重建——单次构建可能 100ms,QPS 上去之后就是性能瓶颈。
但 AiService 实例本身是无状态的——会话状态存在 ChatMemoryProvider 里按 conversationId 隔离,工具列表只是引用。所以实例可跨请求复用,配合双重检查锁延迟初始化:
private GeneralService getOrCreateAgent() {
GeneralService agent = cachedAgent;
if (agent == null) { // 第一次检查:无锁快路径
synchronized (this) {
agent = cachedAgent;
if (agent == null) { // 第二次检查:拿到锁后再确认
agent = createAgent(getLimitedToolBeans(), true);
cachedAgent = agent;
}
}
}
return agent;
}
volatile 保证内存可见性,DCL 保证只构建一次。在高并发场景下,99.99% 的请求走第一个 if 直接返回,零锁开销。
4.2 simple 模式:AiServices + 声明式接口
simple 模式下,业务逻辑就一行——把请求扔给 AiServices:
GeneralService agent = getOrCreateAgent();
Result<String> result = agent.chat(convId, userInput);
那 GeneralService 是啥?一个纯接口:
@AiService
public interface GeneralService {
@SystemMessage("""
你是 AgentX 企业级智能引擎的通用助手。
1. 请根据用户的提问,合理判断是否需要调用工具。
2. 如果用户的问题涉及实时数据(如天气、股票),请务必调用工具,不要编造。
3. 你的回答应当专业、简洁、安全,并使用 Markdown 格式。
""")
Result<String> chat(@MemoryId String conversationId, @UserMessage String message);
@SystemMessage("""
你是 AgentX 企业级智能引擎的通用助手。
...
4. 从工具返回数据中提取数值填入回答,不要遗漏或简化任何数字。
示例:工具返回"气温:26摄氏度",回答应写"26摄氏度",不要只写"摄氏度"。
""")
TokenStream chatStream(@MemoryId String conversationId, @UserMessage String message);
}
为什么用接口而不用具体类?
@AiService 是 LangChain4j 的"声明式 AI 服务"机制:你只声明意图(接口 + 注解),框架在运行时用 JDK 代理生成实现,把 @SystemMessage / @MemoryId / @UserMessage 翻译成对 ChatModel 的实际调用。
业务代码因此彻底解耦于底层调用细节:
- ❌ 不需要手写
chatModel.chat(messages, tools) - ❌ 不需要手动管理
ChatMemory - ❌ 不需要解析工具调用结果
一个接口 = 一个完整的 Agent。 这是 LangChain4j 相对于 Spring AI 在 Agent 场景下最大的优势。
关于 SystemMessage 里那条"逐字检查数字"的规则:这是小模型的真实痛点——qwen2.5:3b 在生成回答时偶尔会把"26 摄氏度"简化成"26 度"或者干脆漏单位。显式约束能显著降低这种幻觉,是低成本部署绕不开的细节。
4.3 流式 SSE 三件套:onPartial / onTool / onComplete
simple 模式的流式实现是整个 AgentWorkflow 最复杂的代码段,因为要解决三个并发难题:
public void executeSimpleWorkflowStream(String convId, String userInput, SseEmitter emitter) {
AtomicLong thinkingStart = new AtomicLong(System.currentTimeMillis());
AtomicBoolean completed = new AtomicBoolean(false);
AtomicBoolean firstTokenReceived = new AtomicBoolean(false);
GeneralService streamingAgent = getOrCreateStreamingAgent();
pushSseEvent(emitter, "[STATUS]", "正在思考,请稍候...");
// ① 心跳线程:AI 思考期间定期发心跳,防止前端超时断连
Thread heartbeatThread = Thread.ofVirtual().start(() -> {
long interval = 5_000L;
while (!firstTokenReceived.get() && !completed.get()) {
try { Thread.sleep(interval); }
catch (InterruptedException e) {
Thread.currentThread().interrupt(); break;
}
if (!firstTokenReceived.get() && !completed.get()) {
long elapsed = System.currentTimeMillis() - thinkingStart.get();
pushSseEvent(emitter, "[STATUS]",
"正在思考中... (已思考 " + (elapsed / 1000) + "秒)");
// ② 心跳间隔递增:思考越久,越不烦用户
interval = Math.min(interval + 5_000, 30_000L);
}
}
});
// ③ 预捕获 Context 和 Span,跨线程传给回调
AgentContext capturedCtx = AgentThreadContext.get();
Span capturedHttpSpan = tracer.currentSpan();
streamingAgent.chatStream(convId, userInput)
.onPartialResponse(token -> {
if (firstTokenReceived.compareAndSet(false, true)) {
long thinking = System.currentTimeMillis() - thinkingStart.get();
log.info("[总监] 模型开始输出 | 思考耗时: {}ms", thinking);
}
pushSseEvent(emitter, "[CONTENT]", token);
})
.onToolExecuted(toolExecution -> {
firstTokenReceived.set(true); // 工具调用也算"有动静",停心跳
String toolName = toolExecution.request().name();
if (capturedCtx != null) capturedCtx.recordToolInvocation(toolName);
pushSseEvent(emitter, "[STATUS]", "正在调用工具: " + toolName);
})
.onCompleteResponse(response -> {
// ④ 幂等防护:onComplete 和 onError 可能同时触发
if (!completed.compareAndSet(false, true)) return;
pushSseEvent(emitter, "[DONE]", "FINISHED");
emitter.complete();
})
.onError(error -> {
if (!completed.compareAndSet(false, true)) return;
String friendlyMsg = buildFriendlyErrorMessage(error);
pushSseEvent(emitter, "[ERROR]", friendlyMsg);
try { emitter.complete(); } catch (Exception ignored) {}
})
.start();
}
四个数字标注的设计点,对应四个并发问题:
| # | 问题 | 解法 |
|---|---|---|
| ① | AI 思考可能 30s+,前端 SSE 默认 30s 超时 | 心跳线程定期推 STATUS |
| ② | 心跳太频繁吵用户 | 间隔递增 5→30s 封顶 |
| ③ | 回调跑在 ForkJoinPool,ThreadLocal 失效 | 预捕获 Context/Span 引用 |
| ④ | onComplete/onError 可能并发触发,重复发送 | AtomicBoolean.compareAndSet 幂等 |
心跳间隔为什么递增? 思考越久用户越焦虑,刚开始密集心跳安慰;如果 1 分钟还没结果,用户已经接受了等待,每 5 秒打扰反而烦——产品体验细节。
4.4 workflow 模式:LangGraph 三节点循环
复杂任务进入 LangGraphOrchestrator,这里是图编排的核心:
private Map<String, Object> startGraphExecution(WorkflowStateData state, SseEmitter emitter) {
int safetyCounter = 0;
String nextNode = "retrieval_node"; // 初始节点:先查资料
while (!"END".equals(nextNode) && safetyCounter < 12) { // ⚠️ 安全上限
// 检查取消信号
if (cancellationRequests.contains(state.getSessionId())) {
return Map.of("output", "任务已被取消。", "status", "CANCELLED");
}
final String currentNode = nextNode;
long stepStart = System.currentTimeMillis();
// 路由:当前节点决定下一节点
nextNode = switch (currentNode) {
case "retrieval_node" -> {
if (emitter != null) sendSse(emitter, "[STATUS]", "正在调阅内部知识库...");
yield callRetrievalNode(state);
}
case "agent_node" -> {
if (emitter != null) {
sendSse(emitter, "[STATUS]", "AI 正在深度思考...");
yield callStreamingAgentNode(state, emitter);
} else {
yield callAgentNode(state);
}
}
case "action_node" -> {
if (emitter != null) sendSse(emitter, "[STATUS]", "正在执行外部工具指令...");
yield callActionNode(state);
}
default -> "END";
};
log.info("[Orchestrator] 📊 节点 [{}] 耗时: {}ms",
currentNode, System.currentTimeMillis() - stepStart);
safetyCounter++;
}
return Map.of("output", extractFinalAnswer(state),
"status", "COMPLETED");
}
safetyCounter < 12 是什么?
ReAct 循环理论上可以无限跑——LLM 一直要求调工具,工具一直返回结果,永不收敛。生产环境必须有上限熔断,否则可能因为模型 bug 烧掉一台机器的 CPU 时间。
12 是经验值:复杂任务最多 ≈ 3 轮 RAG + 5 轮工具 + 3 轮反思 + 1 轮缓冲 = 12。超出强制 END,让 LLM 用现有信息给最终答案。
4.5 三大节点细节
retrieval_node — RAG 检索 + Prompt 缝合
private String callRetrievalNode(WorkflowStateData state) {
return Observation.createNotStarted("workflow.retrieval", observationRegistry).observe(() -> {
List<Content> contents = contentRetriever.retrieve(new Query(state.getUserInput()));
// 💡 针对小模型优化:把资料和问题缝合成一条强力指令
StringBuilder prompt = new StringBuilder(
"### 角色指令 ###\n你是一个极其严谨的助手。请严格基于参考资料回答问题。\n\n");
if (!contents.isEmpty()) {
String knowledge = contents.stream()
.map(c -> c.textSegment().text())
.collect(Collectors.joining("\n---\n"));
prompt.append("### 参考资料 ###\n").append(knowledge).append("\n\n");
}
prompt.append("### 用户问题 ###\n").append(state.getUserInput());
state.getMessages().add(UserMessage.from(prompt.toString()));
return "agent_node"; // 检索完进入 LLM 思考
});
}
agent_node — LLM 推理 + 路由决策
private String callAgentNode(WorkflowStateData state) {
return Observation.createNotStarted("workflow.agent", observationRegistry)
.contextualName("ai-thinking-sync")
.observe(() -> {
ChatRequest request = buildChatRequest(state);
ChatResponse response = chatModel.chat(request);
AiMessage aiMessage = response.aiMessage();
state.getMessages().add(aiMessage);
// 💡 关键路由逻辑:LLM 决定下一步去哪
return aiMessage.hasToolExecutionRequests() ? "action_node" : "END";
});
}
这是和传统 BPMN 工作流最大的差异。 传统工作流分支是开发者预先画好的——if A then B。Agent 工作流不是:下一步走哪取决于 LLM 的实时判断。agent_node 的返回值不是固定路径,而是对 hasToolExecutionRequests() 的实时检查。
action_node — 虚拟线程并行调工具
private String callActionNode(WorkflowStateData state) {
ChatMessage lastMsg = state.getMessages().getLast();
if (!(lastMsg instanceof AiMessage lastAiMsg)
|| lastAiMsg.toolExecutionRequests() == null) {
return "agent_node";
}
List<ToolExecutionRequest> requests = lastAiMsg.toolExecutionRequests();
return Observation.createNotStarted("workflow.action", observationRegistry).observe(() -> {
// 💡 Java 21 虚拟线程 + CompletableFuture:N 个工具真并发
List<CompletableFuture<ToolExecutionResultMessage>> futures = requests.stream()
.map(req -> CompletableFuture.supplyAsync(() -> {
return Observation.createNotStarted("workflow.tool", observationRegistry)
.contextualName("tool:" + req.name())
.observe(() -> {
try {
Object result = toolRegistry.executeTool(req.name(), req.arguments());
return ToolExecutionResultMessage.from(req, result.toString());
} catch (Exception e) {
return ToolExecutionResultMessage.from(req, "Error: " + e.getMessage());
}
});
}, virtualExecutor))
.toList();
// ⚠️ 必须用 allOf 才是真并行
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
futures.stream().map(CompletableFuture::join).forEach(msg -> state.getMessages().add(msg));
return "agent_node"; // 工具结果回灌给 LLM
});
}
allOf().join() vs 循环 join() 的差别——这是一个高频面试题:
// ❌ 错误写法:实际是顺序执行
futures.stream().map(CompletableFuture::join).toList();
// ✅ 正确写法:真并行
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
futures.stream().map(CompletableFuture::join).toList(); // 此时全部已完成
第一种写法被 Java Stream 的串行特性绑架——map 是惰性的,但 terminal operation toList() 会按顺序触发每个 join(),结果是 task1 等完再等 task2。allOf 显式等到全部完成,才能拿到真并行的收益(N 个工具同时跑,总耗时 ≈ 最慢的那个)。
五、问题解决:三个让大模型项目翻车的实战大坑
代码看完,现在分享生产环境真正会让人头疼的三个坑——这些是从源码注释和工程提交记录里能反推出来的实战经验,任何严肃做 Agent 项目的人都会遇到。
坑一:@Observed 注解让 SSE Span 严重失真
现象
给 executeSimpleWorkflowStream 方法加了 @Observed(name = "ai.chat.stream") 注解,原本期望 Jaeger 里看到完整的 60 秒 AI 推理耗时,结果显示只有 800ms——监控完全失真。
原因
Spring AOP 的 @Observed 是包装"方法返回时刻"作为 Span 结束点。但 SSE 是异步推流:
| 时间线 | 事件 | AOP Span 状态 |
|---|---|---|
| T+0ms | 方法被调用,AOP 开启 Span | OPEN |
| T+800ms | 注册完所有 SSE 回调,方法 return | CLOSE ⚠️ |
| T+800ms ~ T+60s | AI 在 ForkJoinPool 真正推理 | 已无 Span 追踪 |
| T+60s | 推理完成 | 早已闭合 |
方法返回时只是注册了回调,真正的工作在后台异步进行——AOP Span 早就闭合了,整段 AI 推理完全没被追踪。
解决
不用 @Observed,改为手动创建 Span + 在回调里 end:
// HTTP 虚拟线程上预捕获并创建 ai.inference 子 Span
Span capturedHttpSpan = tracer.currentSpan();
Span aiInferenceSpan = null;
if (capturedHttpSpan != null) {
Tracer.SpanInScope scope = tracer.withSpan(capturedHttpSpan);
aiInferenceSpan = tracer.nextSpan().name("ai.inference").start();
aiInferenceSpan.tag("conv.id", convId);
scope.close();
}
final Span finalAiSpan = aiInferenceSpan;
// 回调里追加 tag 和 event
streamingAgent.chatStream(...)
.onPartialResponse(token -> {
if (firstTokenReceived.compareAndSet(false, true)) {
finalAiSpan.tag("ai.thinking_ms", ...);
finalAiSpan.event("FIRST_TOKEN");
}
})
.onCompleteResponse(response -> {
finalAiSpan.tag("ai.total_ms", ...);
finalAiSpan.event("STREAM_COMPLETE");
finalAiSpan.end(); // ⚠️ 必须显式 end,否则 Span 永不闭合
});
| 实现方式 | Jaeger 显示耗时 | 准确性 |
|---|---|---|
@Observed AOP |
~800ms | ❌ 严重失真 |
| 手动 Span + 回调 end | 实际 60s | ✅ 准确 |
教训:异步场景下,AOP 类的"声明式"监控基本都不能用。要么手动控制 Span 生命周期,要么用 Reactive 框架的内置 Hook。
坑二:@Tool 工具方法在 ForkJoinPool 里拿不到 ThreadLocal
现象
工具方法里通过 AgentThreadContext.get() 拿当前 Context,单元测试一切正常,部署到生产后流式接口里返回 null——而且只有流式接口会,同步接口不会。
原因
| 接口类型 | 工具执行线程 | ThreadLocal 是否生效 |
|---|---|---|
同步 /process |
HTTP 虚拟线程(同一个) | ✅ 生效 |
流式 /process-stream |
ForkJoinPool 回调线程 | ❌ 失效 |
LangChain4j 流式回调(含工具执行)默认调度到 ForkJoinPool.commonPool,不是原始 HTTP 虚拟线程。AgentThreadContext 用的是 ThreadLocal<AgentContext>,ThreadLocal 是线程绑定的,跨线程必丢。
解决
预捕获模式 —— 在 HTTP 线程上提前抓出引用,闭包传给回调:
// HTTP 虚拟线程内
AgentContext capturedCtx = AgentThreadContext.get();
streamingAgent.chatStream(...)
.onToolExecuted(toolExecution -> {
// ForkJoinPool 线程内 — 不能用 AgentThreadContext.get()
// 但 capturedCtx 是闭包变量,可直接用
if (capturedCtx != null) {
capturedCtx.recordToolInvocation(toolExecution.request().name());
}
});
更彻底的方案 是用 Micrometer 的 Observation.wrap(Runnable) 或 ContextSnapshot,能把整个上下文链路自动传过去。但对于已有 ThreadLocal 体系的代码,预捕获是侵入性最小的修法。
坑三:小模型注册了 12 个工具后开始"自我创作"
现象
给 qwen2.5:3b 注册了 12 个工具,原本好好的天气查询,模型开始返回这种乱七八糟的工具调用:
{
"name": "查询银行余额", // 这工具明明用来查账户的
"arguments": {"account": "北京"} // 北京 ≠ 账户号
}
工具名和参数完全是模型自己编造的。
原因
小模型上下文注意力有限。每个 ToolSpecification 序列化大约 200-500 token,12 个工具 ≈ 3-6K token 全部塞进 Prompt。模型的"工具选择"任务在这种规模下精度大幅下降——它根本记不清每个工具应该传什么参数。
解决
第一步:强制工具瘦身(当前实现)
private List<ToolSpecification> getOptimizedToolSpecs() {
List<ToolSpecification> all = toolRegistry.getAllToolSpecifications();
if (all.size() > 8) {
log.info("[总监] 工具库过载,已执行自动精简 ({} -> 8)", all.size());
return new ArrayList<>(all.subList(0, 8));
}
return new ArrayList<>(all);
}
第二步:脱掉 CGLIB 代理外壳
private List<Object> getLimitedToolBeans() {
List<Object> all = new ArrayList<>(toolRegistry.getToolBeans());
if (all.size() > 8) all = new ArrayList<>(all.subList(0, 8));
// 💡 Spring 的 @Transactional/@Async/@Observed 会让 Bean 被 CGLIB 代理包装
// 代理子类没有原类的 @Tool 注解,LangChain4j 反射扫描会找不到工具
return all.stream().map(bean -> {
Object target = AopProxyUtils.getSingletonTarget(bean);
return target != null ? target : bean;
}).collect(Collectors.toList());
}
第三步(进阶方向):基于语义相似度的工具召回
// 伪代码:把工具描述也向量化,问题来了之后按相似度排序
List<Float> queryEmbedding = embeddingModel.embed(userInput);
List<ToolSpec> rankedTools = toolEmbeddings.entrySet().stream()
.sorted(by cosineSimilarity to queryEmbedding desc)
.limit(8).map(Entry::getKey).toList();
教训:小模型不是免费的午餐。它便宜、能本地部署、隐私好,但你得在框架层帮它处理它处理不了的复杂度——工具列表、Prompt 缝合、上下文压缩,都是框架层的职责。
六、总结:一张表 + 五条经验
设计决策回顾
| 设计决策 | 解决什么问题 |
|---|---|
| 双档工作流(simple/workflow) | 避免对简单任务过度设计,80% 请求走 simple 拿到极简性能 |
| AiService 实例 + 双重检查锁缓存 | 避免每次重建付出反射代价 |
| 三节点状态机(retrieval/agent/action) | 把 ReAct 循环显式化,每个节点可观测、可优化 |
safetyCounter < 12 |
防止 LLM 无限要工具,烧 CPU 烧 token |
CompletableFuture.allOf 并行调工具 |
多工具真并行(不是 Stream join 的伪并行) |
| 预捕获 Context/Span | 解决 ForkJoinPool 回调线程 ThreadLocal 失效 |
协作式取消(cancellationRequests Set) |
比 Thread.interrupt() 更安全,节点边界自检退出 |
| 工具瘦身上限 8 | 保护小模型不被过多工具规格挤占上下文 |
| 心跳线程递增间隔 | 平衡用户体验和 SSE 带宽 |
五条核心经验
- Agent 不是单次 LLM 调用——是有循环、有分支、可中断的状态机,必须用工作流引擎建模
- simple/workflow 双档设计是必要的——80% 请求走 simple 拿到极简性能,剩下 20% 走 workflow 处理复杂任务
- 跨线程上下文传递必须显式——ThreadLocal 在 ForkJoinPool 回调里必丢,预捕获是最稳的解法
- 小模型保护需要框架介入——工具瘦身、上下文裁剪、Prompt 缝合都是引擎层职责
- 取消机制设计要前置——SSE 三个回调 (
onCompletion/onTimeout/onError) 全部映射到统一取消标志
七、写在最后
工作流引擎是 Agent 框架最核心的"心跳"。但你看完这篇文章会发现:它本质上不是 AI 问题,而是 Java 工程问题——状态机怎么设计、线程上下文怎么传、并发怎么调度、流式怎么发、取消怎么实现……每一个都是过去 10 年我们做企业级 Java 已经解决过的问题,只是现在在 LLM 场景下重新组合。
这正是 Java 程序员转 AI 工程方向的优势所在——底层的工程素养可以直接迁移,缺的只是对 LLM 调度模式的理解。AgentX 这个项目就是这种迁移的一次完整实践。
整个 AgentX 专栏目前已发布 8 篇,覆盖了 Agent 系统从底层架构到生产部署的完整链路:技术选型 → 架构设计 → 工具系统 → RAG → 记忆 → 可观测 → 工作流。下一篇会写风控与限流——LLM Token 配额、并发降级、敏感词审计这些企业级落地必须的安全工程。
如果你也在做 Agent 项目,或者准备转型 AI 工程方向:
- 代码全公开 — 公众号回复「工作流」获取本文完整代码包
- 专栏持续更新 — 每篇都基于真实开源项目源码,不水
- 欢迎交流 — 评论区或公众号私信都行,踩过的坑越分享越值钱
💬 互动话题:你在 Agent 工作流上踩过哪些坑?跨线程追踪、并发调工具、长任务取消……这些场景的设计真的能差出十倍生产稳定性。评论区聊聊你的经历。
关注公众号 【SuniaCoder-AI全栈架构实战】,回复「工作流」获取本文完整代码包,回复「可观测」获取观测配置代码,回复「RAG」获取知识库代码包。
关于作者 & 联系方式
汪旭 / Sunia — Java 全栈开发者,AI 应用工程化实践者
专注企业级 AI 落地,擅长极限资源优化,有 RAG、Agent、知识图谱方向的完整实战经验。
| 平台 | 地址 / 说明 |
|---|---|
| CSDN | SuniaCoder-AI|13.5 万+ 阅读,RAG/Agent 系列持续更新 |
| 微信公众号 | 搜索【SuniaCoder-AI全栈架构实战】|关注回复「工作流」获取本文完整代码包 |
| 掘金 | SuniaCoder-AI |
| 知乎 | SuniaCoder-AI |
| 合作咨询 | 提供企业私有化大模型部署与定制开发(基础部署 / 企业定制 / 年度维保)欢迎私信洽谈 |
如果内容对你有帮助,点赞 + 收藏 + 关注是最大的支持,也能让更多需要的人看到这篇文章。
AgentX 专栏导航
| 篇 | 标题 | 核心内容 |
|---|---|---|
| ① | 一个 Java 开发者的 Agent 实践之路(前言) | 专栏总览 / 选题思路 |
| ② | 没有 GPU、只有 3 台低配云服务器,我如何选出 AgentX 的技术栈 | LangChain4j / Ollama / Milvus / Redis 选型 |
| ③ | AgentX 架构设计全解析:一个请求是如何从 HTTP 走到 LLM 再回来的 | 六层架构 / SSE 流式 / 虚拟线程 / TraceId |
| ④ | 工具系统深度实现:从 @Tool 注解到 MCP 协议,构建企业级 Agent 工具体系 | ToolRegistry / McpToolServer / @Tool |
| ⑤ | RAG 进阶:用 Milvus + bge-m3 构建比 ES 更懂语义的企业知识库 | 向量检索 / bge-m3 / MilvusV2 |
| ⑥ | 记忆系统:用 Redis + Milvus 给 AI 配上短期 + 长期双层记忆 | ChatMemoryStore / 语义召回 / 多轮上下文 |
| ⑦ | 全链路可观测:用 OpenTelemetry + Jaeger 让每次 AI 对话都可追踪可复盘 | OTel / Jaeger / SpanExporter / TraceId |
| ⑧ | 工作流引擎:AgentWorkflow 怎么把工具、记忆、流程串成一条流水线(本文) | AgentWorkflow / LangGraph / 虚拟线程 / SSE |
| ⑨ | 即将发布:风控与限流——LLM Token 配额、并发降级、敏感词审计的工程化实践 | TokenBucket / 配额 / 审计 / 降级 |
↑ 上一篇:[全链路可观测:用 OpenTelemetry + Jaeger 让每次 AI 对话都可追踪可复盘|AgentX 专栏⑦]
↓ 下一篇:风控与限流——LLM Token 配额、并发降级、敏感词审计的工程化实践(即将发布)
Tags:#AgentX #工作流引擎 #LangGraph #LangChain4j #AiServices #SSE流式 #虚拟线程 #Java21 #ReAct #Agent架构
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)