工作流引擎:AgentWorkflow 怎么把工具、记忆、流程串成一条流水线|AgentX 专栏⑧

本文是 AgentX 技术专栏第八篇。基于真实项目源码(AgentWorkflow / WorkflowExecutor / LangGraphOrchestrator / GeneralService),从技术简介到设计思路,从核心代码到生产踩坑,循序渐进拆解 AgentX 的双档工作流引擎——让 Java 程序员一次性看懂 Agent 框架最关键的那个"调度大脑"。


本文速览:

  • 没有工作流引擎的 Agent,本质上就是一坨 if-else——为什么?
  • Agent 工作流引擎是什么?对标 LangGraph / AutoGen / CrewAI 的定位
  • AgentX 的四条核心设计原则:双档分流、状态机编排、协作式取消、上下文显式传递
  • simple 模式:AiServices + 实例缓存 + 双重检查锁
  • workflow 模式:LangGraph 三节点循环 + 虚拟线程并行调工具
  • 流式 SSE 三件套:onPartialResponse / onToolExecuted / onComplete + 心跳保活
  • 三个让大模型项目翻车的实战大坑,以及对应的修法

文章很长(约 38 KB),但每一节都有承上启下,建议从头读——只看代码片段可能 get 不到设计意图。读完文末有完整代码包获取方式。


一、先抛个问题:没有工作流引擎的 Agent 长什么样?

很多 Java 程序员第一次写 Agent,代码大概长这样:

@PostMapping("/chat")
public String chat(String userInput) {
    // 第一步:让 AI 思考
    AiMessage response = chatModel.chat(buildMessages(userInput));

    // 第二步:如果 AI 想调工具,那就调
    if (response.hasToolExecutionRequests()) {
        for (var req : response.toolExecutionRequests()) {
            Object result = toolRegistry.execute(req.name(), req.arguments());
            // 把工具结果塞回去再让 AI 思考一遍
            messages.add(ToolExecutionResultMessage.from(req, result));
        }
        // 再问一遍
        response = chatModel.chat(messages);
        // 如果它还想调工具呢?
        if (response.hasToolExecutionRequests()) {
            // ……嵌套 if,写到怀疑人生
        }
    }
    return response.text();
}

这段代码上线一周就会变成噩梦:

需求 改动代价
想加 RAG 检索 在 chat 前面再 if 一段
想支持 SSE 流式 重写一份流式版本,代码翻倍
用户想中途取消 加 cancelToken,每层 if 都要检查
想统计每个工具的调用耗时 在每个 execute 前后加 log/Span,散落各处
多工具想并发执行 把 for 循环改成 CompletableFuture,又得测一遍
不同业务想用不同 Prompt 策略 复制粘贴一份新的 Controller

所有这些"动一发牵全身"的痛点,源头只有一个:流程逻辑写死在过程式代码里。

工业界对此的标准答案是——用工作流引擎把"流程"和"逻辑"解耦。LangChain 有 LangGraph,微软有 AutoGen,AgentX 这个 Java 项目也得有自己的工作流引擎。这篇文章就来拆解它怎么设计、怎么落地、怎么避坑。


二、技术简介:Agent 工作流引擎是什么

2.1 一句话定义

Agent 工作流引擎是把 "LLM 思考 → 调工具 → 再思考"的循环抽象成可配置的状态机,对外提供统一调度入口的运行时组件。

它至少要解决五件事:

职责 解决什么问题
节点编排 把"思考"“检索”"工具调用"等步骤抽象成节点,让流程可读、可改
状态流转 维护对话/工作流的执行状态,支持中断、恢复、回溯
并发调度 多个工具能并行执行而不是串行 join
可观测埋点 每个节点的耗时、入参、结果都能落到 Trace/Log
流式输出 LLM 生成的 token 能实时推给前端,不能等 1 分钟才返回

2.2 业界横向对比

框架 语言 编排范式 适合场景
LangGraph Python DAG 图编排(节点+边) Python 生态,复杂多 Agent 协作
AutoGen Python 多 Agent 对话式编排 多 Agent 互相讨论的研究场景
CrewAI Python 角色+任务的层级化分工 模拟团队协作(产品+开发+测试)
Spring AI Advisors Java 拦截器链式增强 Spring Boot 项目轻量增强
AgentX AgentWorkflow(本文) Java 双档:AiService 直驱 + LangGraph 状态机 企业级 Java 项目,单 Agent 多工具

AgentX 没去模仿 LangGraph 的全图编排(那对 Java 生态来说太重),而是只抽象到企业项目真正需要的程度——两档够用就停手。这是后面要展开的"设计原则一"。

2.3 AgentWorkflow 在 AgentX 里的位置

┌──────────────────────────────────────────────────────────┐
│  ① 接入层  AgentController (REST/SSE 入口)               │
├──────────────────────────────────────────────────────────┤
│  ② 调度层  AgentService (超时熔断 + TraceId 捕获)        │
├──────────────────────────────────────────────────────────┤
│  ③ 工作流层 AgentWorkflow ← 本文主角                     │
│       ├─ simple    → AiServices 直驱                     │
│       └─ workflow  → WorkflowExecutor → LangGraphOrch.   │
├──────────────────────────────────────────────────────────┤
│  ④ 工具层  ToolRegistry · ChatMemoryProvider             │
│  ⑤ 模型层  ChatModel (Ollama) · StreamingChatModel       │
│  ⑥ 存储层  Milvus (RAG) · Redis (Memory)                 │
└──────────────────────────────────────────────────────────┘

AgentWorkflow 处于第③层,向上接住 AgentService 的调度请求,向下调用工具/模型/记忆——它是整个 Agent 系统的"调度大脑"。


三、设计思路:四条让代码不烂的核心原则

技术简介看完,开始讲"为什么这么设计"。AgentX 的工作流引擎有四条核心原则,缺一不可。

原则一:双档分流,避免过度设计

不是所有请求都需要走"图编排+多节点+并行工具"。日常对话场景下:

请求类型 占比 真实需求
闲聊"你好" ~30% 单次 LLM 调用就够
简单问答"今天天气" ~50% 单次 LLM + 单工具调用
复杂任务"对比 5 家公司财报" ~20% RAG + 多工具 + 多轮推理

80% 的请求用 5% 的代码就能 cover——为它们配一套完整图编排是过度设计。

所以 AgentX 把工作流引擎分成两档:

// simple 模式:闲聊/单工具 → AiServices 一行搞定
public String executeSimpleWorkflow(String convId, String userInput) {
    GeneralService agent = getOrCreateAgent();
    Result<String> result = agent.chat(convId, userInput);
    return processResult(result);
}

// workflow 模式:复杂任务 → LangGraph 多节点循环
public String executeComplexWorkflow(String convId, String userInput) {
    List<ToolSpecification> toolSpecs = getOptimizedToolSpecs();
    return workflowExecutor.execute(convId, userInput, toolSpecs);
}

调用方根据 AgentRequest.agentType() 路由——简单的走 simple,复杂的走 workflow。这种"奥卡姆剃刀式"的分流是企业项目区别于 demo 项目的关键标志。


原则二:状态机编排,让流程可观测

workflow 模式下,AgentX 借鉴 LangGraph 的"节点 + 状态 + 路由"模型,把 Agent 执行抽象成三节点循环:

       ┌─────────────────────────────────────────┐
       ↓                                         │
┌─────────────────┐   ┌────────────┐   ┌────────────────┐
│ retrieval_node  │ → │ agent_node │ → │  action_node   │
│  RAG 检索        │   │  LLM 思考  │   │  并行调工具     │
│  缝合 Prompt     │   │  路由决策  │   │  汇总结果       │
└─────────────────┘   └─────┬──────┘   └────────────────┘
                            │
                       是否还需要工具?
                            │
                  yes ←─────┴─────→ no
                                     │
                                     ▼
                                   END

每个节点都是独立函数 + 独立 Observation Span,意味着:

  • Jaeger 里能看到每个节点的精确耗时
  • 加新节点不影响旧节点
  • 节点出错可以单独捕获和降级
  • safetyCounter < 12 给整个循环上保险,防止 LLM 无限要工具

这种设计的代价是多写 100 行 orchestration 代码,但换来的可读性和可观测性远远值得。


原则三:协作式取消,比强制中断更安全

Agent 任务跑分钟级很常见,用户可能:

  • 关浏览器标签(前端 SSE 断连)
  • 触发超时(前端等不及)
  • 主动点"停止"按钮

强制中断(Thread.interrupt())会把任务卡在某个奇怪状态:HTTP 连接半开、Milvus 查询挂起、ChatMemory 写一半。

AgentX 选择协作式取消

// 1. 所有 SSE 异常映射到统一取消标志
emitter.onCompletion(() -> cancellationRequests.add(workflowId));
emitter.onTimeout(()    -> cancellationRequests.add(workflowId));
emitter.onError(e       -> cancellationRequests.add(workflowId));

// 2. 节点循环每轮检查
while (!"END".equals(nextNode) && safetyCounter < 12) {
    if (cancellationRequests.contains(state.getSessionId())) {
        // 走清理逻辑,不强中断
        return Map.of("output", "任务已被取消。", "status", "CANCELLED");
    }
    // 跑当前节点...
}

代价:当前节点必须跑完才能取消(最多多等几百毫秒)。换来:资源永远干净,状态可预期


原则四:跨线程上下文显式传递

LangChain4j 的流式回调(onPartialResponse / onToolExecuted)执行在 内部 ForkJoinPool 上,不是原始 HTTP 虚拟线程。这意味着 ThreadLocal 在回调里必然失效

// 在 onToolExecuted 回调里:
AgentContext ctx = AgentThreadContext.get();  // 💀 null
Span span = tracer.currentSpan();             // 💀 null

AgentX 的解法是预捕获模式——在虚拟线程退出前把对象引用抓出来,闭包传给回调:

// HTTP 虚拟线程上预捕获
AgentContext capturedCtx = AgentThreadContext.get();
Span capturedHttpSpan = tracer.currentSpan();

// ForkJoinPool 回调中通过闭包变量使用
streamingAgent.chatStream(convId, userInput)
    .onToolExecuted(toolExecution -> {
        if (capturedCtx != null) capturedCtx.recordToolInvocation(...);
        if (capturedHttpSpan != null) capturedHttpSpan.event(...);
    });

这条原则后面会反复出现——任何跨线程的上下文,都必须显式捕获引用,不能依赖 ThreadLocal。


四条原则讲完,下面看具体怎么落地。


四、代码解析:从入口到节点的完整链路

4.1 入口分发:AgentWorkflow

AgentWorkflow 是整个工作流的入口,对应"执行总监"的角色——AgentService 接到请求后,由总监决定走哪条流水线:

@Slf4j
@Component
@RequiredArgsConstructor
public class AgentWorkflow {

    private final ChatModel chatModel;                  // 同步大脑
    private final StreamingChatModel streamingChatModel; // 流式大脑
    private final WorkflowExecutor workflowExecutor;     // 复杂流水线
    private final ToolRegistry toolRegistry;
    private final ChatMemoryProvider chatMemoryProvider;
    private final Tracer tracer;

    // 💡 关键:AiService 实例可跨请求复用,缓存起来
    private volatile GeneralService cachedAgent;
    private volatile GeneralService cachedStreamingAgent;

    // ── simple 模式(AiService 直驱)─────────────────────
    public String executeSimpleWorkflow(String convId, String userInput) { ... }
    public void executeSimpleWorkflowStream(...) { ... }

    // ── workflow 模式(LangGraph 图编排)─────────────────
    public String executeComplexWorkflow(String convId, String userInput) { ... }
    public void executeComplexWorkflowStream(...) { ... }
}

两个 volatile cachedAgent 字段是关键性能优化点。

LangChain4j 的 AiServices.builder().build() 实际上是用 JDK 动态代理生成 Proxy 对象,构建过程涉及反射、类扫描、ToolSpecification 提取等开销。如果每次请求都重建——单次构建可能 100ms,QPS 上去之后就是性能瓶颈。

但 AiService 实例本身是无状态的——会话状态存在 ChatMemoryProvider 里按 conversationId 隔离,工具列表只是引用。所以实例可跨请求复用,配合双重检查锁延迟初始化:

private GeneralService getOrCreateAgent() {
    GeneralService agent = cachedAgent;
    if (agent == null) {                    // 第一次检查:无锁快路径
        synchronized (this) {
            agent = cachedAgent;
            if (agent == null) {            // 第二次检查:拿到锁后再确认
                agent = createAgent(getLimitedToolBeans(), true);
                cachedAgent = agent;
            }
        }
    }
    return agent;
}

volatile 保证内存可见性,DCL 保证只构建一次。在高并发场景下,99.99% 的请求走第一个 if 直接返回,零锁开销


4.2 simple 模式:AiServices + 声明式接口

simple 模式下,业务逻辑就一行——把请求扔给 AiServices

GeneralService agent = getOrCreateAgent();
Result<String> result = agent.chat(convId, userInput);

GeneralService 是啥?一个纯接口

@AiService
public interface GeneralService {

    @SystemMessage("""
        你是 AgentX 企业级智能引擎的通用助手。
        1. 请根据用户的提问,合理判断是否需要调用工具。
        2. 如果用户的问题涉及实时数据(如天气、股票),请务必调用工具,不要编造。
        3. 你的回答应当专业、简洁、安全,并使用 Markdown 格式。
        """)
    Result<String> chat(@MemoryId String conversationId, @UserMessage String message);

    @SystemMessage("""
        你是 AgentX 企业级智能引擎的通用助手。
        ...
        4. 从工具返回数据中提取数值填入回答,不要遗漏或简化任何数字。
           示例:工具返回"气温:26摄氏度",回答应写"26摄氏度",不要只写"摄氏度"。
        """)
    TokenStream chatStream(@MemoryId String conversationId, @UserMessage String message);
}

为什么用接口而不用具体类?

@AiService 是 LangChain4j 的"声明式 AI 服务"机制:你只声明意图(接口 + 注解),框架在运行时用 JDK 代理生成实现,把 @SystemMessage / @MemoryId / @UserMessage 翻译成对 ChatModel 的实际调用。

业务代码因此彻底解耦于底层调用细节:

  • ❌ 不需要手写 chatModel.chat(messages, tools)
  • ❌ 不需要手动管理 ChatMemory
  • ❌ 不需要解析工具调用结果

一个接口 = 一个完整的 Agent。 这是 LangChain4j 相对于 Spring AI 在 Agent 场景下最大的优势。

关于 SystemMessage 里那条"逐字检查数字"的规则:这是小模型的真实痛点——qwen2.5:3b 在生成回答时偶尔会把"26 摄氏度"简化成"26 度"或者干脆漏单位。显式约束能显著降低这种幻觉,是低成本部署绕不开的细节。


4.3 流式 SSE 三件套:onPartial / onTool / onComplete

simple 模式的流式实现是整个 AgentWorkflow 最复杂的代码段,因为要解决三个并发难题

public void executeSimpleWorkflowStream(String convId, String userInput, SseEmitter emitter) {
    AtomicLong thinkingStart = new AtomicLong(System.currentTimeMillis());
    AtomicBoolean completed = new AtomicBoolean(false);
    AtomicBoolean firstTokenReceived = new AtomicBoolean(false);

    GeneralService streamingAgent = getOrCreateStreamingAgent();
    pushSseEvent(emitter, "[STATUS]", "正在思考,请稍候...");

    // ① 心跳线程:AI 思考期间定期发心跳,防止前端超时断连
    Thread heartbeatThread = Thread.ofVirtual().start(() -> {
        long interval = 5_000L;
        while (!firstTokenReceived.get() && !completed.get()) {
            try { Thread.sleep(interval); }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt(); break;
            }
            if (!firstTokenReceived.get() && !completed.get()) {
                long elapsed = System.currentTimeMillis() - thinkingStart.get();
                pushSseEvent(emitter, "[STATUS]",
                    "正在思考中... (已思考 " + (elapsed / 1000) + "秒)");
                // ② 心跳间隔递增:思考越久,越不烦用户
                interval = Math.min(interval + 5_000, 30_000L);
            }
        }
    });

    // ③ 预捕获 Context 和 Span,跨线程传给回调
    AgentContext capturedCtx = AgentThreadContext.get();
    Span capturedHttpSpan = tracer.currentSpan();

    streamingAgent.chatStream(convId, userInput)
        .onPartialResponse(token -> {
            if (firstTokenReceived.compareAndSet(false, true)) {
                long thinking = System.currentTimeMillis() - thinkingStart.get();
                log.info("[总监] 模型开始输出 | 思考耗时: {}ms", thinking);
            }
            pushSseEvent(emitter, "[CONTENT]", token);
        })
        .onToolExecuted(toolExecution -> {
            firstTokenReceived.set(true);  // 工具调用也算"有动静",停心跳
            String toolName = toolExecution.request().name();
            if (capturedCtx != null) capturedCtx.recordToolInvocation(toolName);
            pushSseEvent(emitter, "[STATUS]", "正在调用工具: " + toolName);
        })
        .onCompleteResponse(response -> {
            // ④ 幂等防护:onComplete 和 onError 可能同时触发
            if (!completed.compareAndSet(false, true)) return;
            pushSseEvent(emitter, "[DONE]", "FINISHED");
            emitter.complete();
        })
        .onError(error -> {
            if (!completed.compareAndSet(false, true)) return;
            String friendlyMsg = buildFriendlyErrorMessage(error);
            pushSseEvent(emitter, "[ERROR]", friendlyMsg);
            try { emitter.complete(); } catch (Exception ignored) {}
        })
        .start();
}

四个数字标注的设计点,对应四个并发问题:

# 问题 解法
AI 思考可能 30s+,前端 SSE 默认 30s 超时 心跳线程定期推 STATUS
心跳太频繁吵用户 间隔递增 5→30s 封顶
回调跑在 ForkJoinPool,ThreadLocal 失效 预捕获 Context/Span 引用
onComplete/onError 可能并发触发,重复发送 AtomicBoolean.compareAndSet 幂等

心跳间隔为什么递增? 思考越久用户越焦虑,刚开始密集心跳安慰;如果 1 分钟还没结果,用户已经接受了等待,每 5 秒打扰反而烦——产品体验细节。


4.4 workflow 模式:LangGraph 三节点循环

复杂任务进入 LangGraphOrchestrator,这里是图编排的核心:

private Map<String, Object> startGraphExecution(WorkflowStateData state, SseEmitter emitter) {
    int safetyCounter = 0;
    String nextNode = "retrieval_node";  // 初始节点:先查资料

    while (!"END".equals(nextNode) && safetyCounter < 12) {  // ⚠️ 安全上限
        // 检查取消信号
        if (cancellationRequests.contains(state.getSessionId())) {
            return Map.of("output", "任务已被取消。", "status", "CANCELLED");
        }

        final String currentNode = nextNode;
        long stepStart = System.currentTimeMillis();

        // 路由:当前节点决定下一节点
        nextNode = switch (currentNode) {
            case "retrieval_node" -> {
                if (emitter != null) sendSse(emitter, "[STATUS]", "正在调阅内部知识库...");
                yield callRetrievalNode(state);
            }
            case "agent_node" -> {
                if (emitter != null) {
                    sendSse(emitter, "[STATUS]", "AI 正在深度思考...");
                    yield callStreamingAgentNode(state, emitter);
                } else {
                    yield callAgentNode(state);
                }
            }
            case "action_node" -> {
                if (emitter != null) sendSse(emitter, "[STATUS]", "正在执行外部工具指令...");
                yield callActionNode(state);
            }
            default -> "END";
        };

        log.info("[Orchestrator] 📊 节点 [{}] 耗时: {}ms",
                currentNode, System.currentTimeMillis() - stepStart);
        safetyCounter++;
    }

    return Map.of("output", extractFinalAnswer(state),
                  "status", "COMPLETED");
}

safetyCounter < 12 是什么?

ReAct 循环理论上可以无限跑——LLM 一直要求调工具,工具一直返回结果,永不收敛。生产环境必须有上限熔断,否则可能因为模型 bug 烧掉一台机器的 CPU 时间。

12 是经验值:复杂任务最多 ≈ 3 轮 RAG + 5 轮工具 + 3 轮反思 + 1 轮缓冲 = 12。超出强制 END,让 LLM 用现有信息给最终答案。


4.5 三大节点细节

retrieval_node — RAG 检索 + Prompt 缝合

private String callRetrievalNode(WorkflowStateData state) {
    return Observation.createNotStarted("workflow.retrieval", observationRegistry).observe(() -> {
        List<Content> contents = contentRetriever.retrieve(new Query(state.getUserInput()));

        // 💡 针对小模型优化:把资料和问题缝合成一条强力指令
        StringBuilder prompt = new StringBuilder(
            "### 角色指令 ###\n你是一个极其严谨的助手。请严格基于参考资料回答问题。\n\n");

        if (!contents.isEmpty()) {
            String knowledge = contents.stream()
                .map(c -> c.textSegment().text())
                .collect(Collectors.joining("\n---\n"));
            prompt.append("### 参考资料 ###\n").append(knowledge).append("\n\n");
        }
        prompt.append("### 用户问题 ###\n").append(state.getUserInput());
        state.getMessages().add(UserMessage.from(prompt.toString()));

        return "agent_node";  // 检索完进入 LLM 思考
    });
}

agent_node — LLM 推理 + 路由决策

private String callAgentNode(WorkflowStateData state) {
    return Observation.createNotStarted("workflow.agent", observationRegistry)
        .contextualName("ai-thinking-sync")
        .observe(() -> {
            ChatRequest request = buildChatRequest(state);
            ChatResponse response = chatModel.chat(request);
            AiMessage aiMessage = response.aiMessage();
            state.getMessages().add(aiMessage);

            // 💡 关键路由逻辑:LLM 决定下一步去哪
            return aiMessage.hasToolExecutionRequests() ? "action_node" : "END";
        });
}

这是和传统 BPMN 工作流最大的差异。 传统工作流分支是开发者预先画好的——if A then B。Agent 工作流不是:下一步走哪取决于 LLM 的实时判断agent_node 的返回值不是固定路径,而是对 hasToolExecutionRequests() 的实时检查。

action_node — 虚拟线程并行调工具

private String callActionNode(WorkflowStateData state) {
    ChatMessage lastMsg = state.getMessages().getLast();
    if (!(lastMsg instanceof AiMessage lastAiMsg)
            || lastAiMsg.toolExecutionRequests() == null) {
        return "agent_node";
    }
    List<ToolExecutionRequest> requests = lastAiMsg.toolExecutionRequests();

    return Observation.createNotStarted("workflow.action", observationRegistry).observe(() -> {
        // 💡 Java 21 虚拟线程 + CompletableFuture:N 个工具真并发
        List<CompletableFuture<ToolExecutionResultMessage>> futures = requests.stream()
            .map(req -> CompletableFuture.supplyAsync(() -> {
                return Observation.createNotStarted("workflow.tool", observationRegistry)
                    .contextualName("tool:" + req.name())
                    .observe(() -> {
                        try {
                            Object result = toolRegistry.executeTool(req.name(), req.arguments());
                            return ToolExecutionResultMessage.from(req, result.toString());
                        } catch (Exception e) {
                            return ToolExecutionResultMessage.from(req, "Error: " + e.getMessage());
                        }
                    });
            }, virtualExecutor))
            .toList();

        // ⚠️ 必须用 allOf 才是真并行
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        futures.stream().map(CompletableFuture::join).forEach(msg -> state.getMessages().add(msg));

        return "agent_node";  // 工具结果回灌给 LLM
    });
}

allOf().join() vs 循环 join() 的差别——这是一个高频面试题:

// ❌ 错误写法:实际是顺序执行
futures.stream().map(CompletableFuture::join).toList();

// ✅ 正确写法:真并行
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
futures.stream().map(CompletableFuture::join).toList();  // 此时全部已完成

第一种写法被 Java Stream 的串行特性绑架——map 是惰性的,但 terminal operation toList() 会按顺序触发每个 join(),结果是 task1 等完再等 task2。allOf 显式等到全部完成,才能拿到真并行的收益(N 个工具同时跑,总耗时 ≈ 最慢的那个)。


五、问题解决:三个让大模型项目翻车的实战大坑

代码看完,现在分享生产环境真正会让人头疼的三个坑——这些是从源码注释和工程提交记录里能反推出来的实战经验,任何严肃做 Agent 项目的人都会遇到

坑一:@Observed 注解让 SSE Span 严重失真

现象

executeSimpleWorkflowStream 方法加了 @Observed(name = "ai.chat.stream") 注解,原本期望 Jaeger 里看到完整的 60 秒 AI 推理耗时,结果显示只有 800ms——监控完全失真。

原因

Spring AOP 的 @Observed 是包装"方法返回时刻"作为 Span 结束点。但 SSE 是异步推流

时间线 事件 AOP Span 状态
T+0ms 方法被调用,AOP 开启 Span OPEN
T+800ms 注册完所有 SSE 回调,方法 return CLOSE ⚠️
T+800ms ~ T+60s AI 在 ForkJoinPool 真正推理 已无 Span 追踪
T+60s 推理完成 早已闭合

方法返回时只是注册了回调,真正的工作在后台异步进行——AOP Span 早就闭合了,整段 AI 推理完全没被追踪。

解决

不用 @Observed,改为手动创建 Span + 在回调里 end

// HTTP 虚拟线程上预捕获并创建 ai.inference 子 Span
Span capturedHttpSpan = tracer.currentSpan();
Span aiInferenceSpan = null;
if (capturedHttpSpan != null) {
    Tracer.SpanInScope scope = tracer.withSpan(capturedHttpSpan);
    aiInferenceSpan = tracer.nextSpan().name("ai.inference").start();
    aiInferenceSpan.tag("conv.id", convId);
    scope.close();
}
final Span finalAiSpan = aiInferenceSpan;

// 回调里追加 tag 和 event
streamingAgent.chatStream(...)
    .onPartialResponse(token -> {
        if (firstTokenReceived.compareAndSet(false, true)) {
            finalAiSpan.tag("ai.thinking_ms", ...);
            finalAiSpan.event("FIRST_TOKEN");
        }
    })
    .onCompleteResponse(response -> {
        finalAiSpan.tag("ai.total_ms", ...);
        finalAiSpan.event("STREAM_COMPLETE");
        finalAiSpan.end();  // ⚠️ 必须显式 end,否则 Span 永不闭合
    });
实现方式 Jaeger 显示耗时 准确性
@Observed AOP ~800ms ❌ 严重失真
手动 Span + 回调 end 实际 60s ✅ 准确

教训:异步场景下,AOP 类的"声明式"监控基本都不能用。要么手动控制 Span 生命周期,要么用 Reactive 框架的内置 Hook。


坑二:@Tool 工具方法在 ForkJoinPool 里拿不到 ThreadLocal

现象

工具方法里通过 AgentThreadContext.get() 拿当前 Context,单元测试一切正常,部署到生产后流式接口里返回 null——而且只有流式接口会,同步接口不会。

原因

接口类型 工具执行线程 ThreadLocal 是否生效
同步 /process HTTP 虚拟线程(同一个) ✅ 生效
流式 /process-stream ForkJoinPool 回调线程 ❌ 失效

LangChain4j 流式回调(含工具执行)默认调度到 ForkJoinPool.commonPool不是原始 HTTP 虚拟线程AgentThreadContext 用的是 ThreadLocal<AgentContext>ThreadLocal 是线程绑定的,跨线程必丢

解决

预捕获模式 —— 在 HTTP 线程上提前抓出引用,闭包传给回调:

// HTTP 虚拟线程内
AgentContext capturedCtx = AgentThreadContext.get();

streamingAgent.chatStream(...)
    .onToolExecuted(toolExecution -> {
        // ForkJoinPool 线程内 — 不能用 AgentThreadContext.get()
        // 但 capturedCtx 是闭包变量,可直接用
        if (capturedCtx != null) {
            capturedCtx.recordToolInvocation(toolExecution.request().name());
        }
    });

更彻底的方案 是用 Micrometer 的 Observation.wrap(Runnable)ContextSnapshot,能把整个上下文链路自动传过去。但对于已有 ThreadLocal 体系的代码,预捕获是侵入性最小的修法


坑三:小模型注册了 12 个工具后开始"自我创作"

现象

qwen2.5:3b 注册了 12 个工具,原本好好的天气查询,模型开始返回这种乱七八糟的工具调用:

{
  "name": "查询银行余额",          // 这工具明明用来查账户的
  "arguments": {"account": "北京"}  // 北京 ≠ 账户号
}

工具名和参数完全是模型自己编造的。

原因

小模型上下文注意力有限。每个 ToolSpecification 序列化大约 200-500 token,12 个工具 ≈ 3-6K token 全部塞进 Prompt。模型的"工具选择"任务在这种规模下精度大幅下降——它根本记不清每个工具应该传什么参数。

解决

第一步:强制工具瘦身(当前实现)

private List<ToolSpecification> getOptimizedToolSpecs() {
    List<ToolSpecification> all = toolRegistry.getAllToolSpecifications();
    if (all.size() > 8) {
        log.info("[总监] 工具库过载,已执行自动精简 ({} -> 8)", all.size());
        return new ArrayList<>(all.subList(0, 8));
    }
    return new ArrayList<>(all);
}

第二步:脱掉 CGLIB 代理外壳

private List<Object> getLimitedToolBeans() {
    List<Object> all = new ArrayList<>(toolRegistry.getToolBeans());
    if (all.size() > 8) all = new ArrayList<>(all.subList(0, 8));

    // 💡 Spring 的 @Transactional/@Async/@Observed 会让 Bean 被 CGLIB 代理包装
    //    代理子类没有原类的 @Tool 注解,LangChain4j 反射扫描会找不到工具
    return all.stream().map(bean -> {
        Object target = AopProxyUtils.getSingletonTarget(bean);
        return target != null ? target : bean;
    }).collect(Collectors.toList());
}

第三步(进阶方向):基于语义相似度的工具召回

// 伪代码:把工具描述也向量化,问题来了之后按相似度排序
List<Float> queryEmbedding = embeddingModel.embed(userInput);
List<ToolSpec> rankedTools = toolEmbeddings.entrySet().stream()
    .sorted(by cosineSimilarity to queryEmbedding desc)
    .limit(8).map(Entry::getKey).toList();

教训小模型不是免费的午餐。它便宜、能本地部署、隐私好,但你得在框架层帮它处理它处理不了的复杂度——工具列表、Prompt 缝合、上下文压缩,都是框架层的职责。


六、总结:一张表 + 五条经验

设计决策回顾

设计决策 解决什么问题
双档工作流(simple/workflow) 避免对简单任务过度设计,80% 请求走 simple 拿到极简性能
AiService 实例 + 双重检查锁缓存 避免每次重建付出反射代价
三节点状态机(retrieval/agent/action) 把 ReAct 循环显式化,每个节点可观测、可优化
safetyCounter < 12 防止 LLM 无限要工具,烧 CPU 烧 token
CompletableFuture.allOf 并行调工具 多工具真并行(不是 Stream join 的伪并行)
预捕获 Context/Span 解决 ForkJoinPool 回调线程 ThreadLocal 失效
协作式取消(cancellationRequests Set) Thread.interrupt() 更安全,节点边界自检退出
工具瘦身上限 8 保护小模型不被过多工具规格挤占上下文
心跳线程递增间隔 平衡用户体验和 SSE 带宽

五条核心经验

  1. Agent 不是单次 LLM 调用——是有循环、有分支、可中断的状态机,必须用工作流引擎建模
  2. simple/workflow 双档设计是必要的——80% 请求走 simple 拿到极简性能,剩下 20% 走 workflow 处理复杂任务
  3. 跨线程上下文传递必须显式——ThreadLocal 在 ForkJoinPool 回调里必丢,预捕获是最稳的解法
  4. 小模型保护需要框架介入——工具瘦身、上下文裁剪、Prompt 缝合都是引擎层职责
  5. 取消机制设计要前置——SSE 三个回调 (onCompletion/onTimeout/onError) 全部映射到统一取消标志

七、写在最后

工作流引擎是 Agent 框架最核心的"心跳"。但你看完这篇文章会发现:它本质上不是 AI 问题,而是 Java 工程问题——状态机怎么设计、线程上下文怎么传、并发怎么调度、流式怎么发、取消怎么实现……每一个都是过去 10 年我们做企业级 Java 已经解决过的问题,只是现在在 LLM 场景下重新组合。

这正是 Java 程序员转 AI 工程方向的优势所在——底层的工程素养可以直接迁移,缺的只是对 LLM 调度模式的理解。AgentX 这个项目就是这种迁移的一次完整实践。

整个 AgentX 专栏目前已发布 8 篇,覆盖了 Agent 系统从底层架构到生产部署的完整链路:技术选型 → 架构设计 → 工具系统 → RAG → 记忆 → 可观测 → 工作流。下一篇会写风控与限流——LLM Token 配额、并发降级、敏感词审计这些企业级落地必须的安全工程。

如果你也在做 Agent 项目,或者准备转型 AI 工程方向:

  • 代码全公开 — 公众号回复「工作流」获取本文完整代码包
  • 专栏持续更新 — 每篇都基于真实开源项目源码,不水
  • 欢迎交流 — 评论区或公众号私信都行,踩过的坑越分享越值钱

💬 互动话题:你在 Agent 工作流上踩过哪些坑?跨线程追踪、并发调工具、长任务取消……这些场景的设计真的能差出十倍生产稳定性。评论区聊聊你的经历。

关注公众号 【SuniaCoder-AI全栈架构实战】,回复「工作流」获取本文完整代码包,回复「可观测」获取观测配置代码,回复「RAG」获取知识库代码包。


关于作者 & 联系方式

汪旭 / Sunia — Java 全栈开发者,AI 应用工程化实践者

专注企业级 AI 落地,擅长极限资源优化,有 RAG、Agent、知识图谱方向的完整实战经验。

平台 地址 / 说明
CSDN SuniaCoder-AI|13.5 万+ 阅读,RAG/Agent 系列持续更新
微信公众号 搜索【SuniaCoder-AI全栈架构实战】|关注回复「工作流」获取本文完整代码包
掘金 SuniaCoder-AI
知乎 SuniaCoder-AI
合作咨询 提供企业私有化大模型部署与定制开发(基础部署 / 企业定制 / 年度维保)欢迎私信洽谈

如果内容对你有帮助,点赞 + 收藏 + 关注是最大的支持,也能让更多需要的人看到这篇文章。


AgentX 专栏导航

标题 核心内容
一个 Java 开发者的 Agent 实践之路(前言) 专栏总览 / 选题思路
没有 GPU、只有 3 台低配云服务器,我如何选出 AgentX 的技术栈 LangChain4j / Ollama / Milvus / Redis 选型
AgentX 架构设计全解析:一个请求是如何从 HTTP 走到 LLM 再回来的 六层架构 / SSE 流式 / 虚拟线程 / TraceId
工具系统深度实现:从 @Tool 注解到 MCP 协议,构建企业级 Agent 工具体系 ToolRegistry / McpToolServer / @Tool
RAG 进阶:用 Milvus + bge-m3 构建比 ES 更懂语义的企业知识库 向量检索 / bge-m3 / MilvusV2
记忆系统:用 Redis + Milvus 给 AI 配上短期 + 长期双层记忆 ChatMemoryStore / 语义召回 / 多轮上下文
全链路可观测:用 OpenTelemetry + Jaeger 让每次 AI 对话都可追踪可复盘 OTel / Jaeger / SpanExporter / TraceId
工作流引擎:AgentWorkflow 怎么把工具、记忆、流程串成一条流水线(本文) AgentWorkflow / LangGraph / 虚拟线程 / SSE
即将发布:风控与限流——LLM Token 配额、并发降级、敏感词审计的工程化实践 TokenBucket / 配额 / 审计 / 降级

上一篇:[全链路可观测:用 OpenTelemetry + Jaeger 让每次 AI 对话都可追踪可复盘|AgentX 专栏⑦]

下一篇:风控与限流——LLM Token 配额、并发降级、敏感词审计的工程化实践(即将发布)


Tags#AgentX #工作流引擎 #LangGraph #LangChain4j #AiServices #SSE流式 #虚拟线程 #Java21 #ReAct #Agent架构

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐