在搜索框里输入“如何学 Java”,你希望系统能自动变出几个更精准的搜法,比如“Java 零基础自学指南”“Java 入门教程推荐”……这就是 Query Expansion(查询扩展)
本文带你从零开始,既讲清楚背后的 Graph 流程框架,又手把手把服务跑起来。全程流式输出,就像 AI 一个字一个字往外蹦,体验丝滑!


1. 项目速览

stream‑node 是 Spring AI Alibaba Graph 示例里最友好的“Hello World”。它只用一个节点,就实现了带流式输出的查询扩展。

项目属性 说明
定位 Graph 框架入门示例 + 流式处理基础
核心能力 将用户的一句话扩展成 N 个高质量搜索变体
难度 ⭐(只要会 Spring Boot 就能玩)
技术栈 Spring Boot、Spring AI、Alibaba DashScope(通义千问)
端口 8080

:用户问一句话,AI 生成三个不同版本的回答,整个过程以 SSE 流式返回。


2. 两个关键概念

2.1 图编排(Graph)

在 Spring AI Alibaba Graph 里,我们把一个 AI 任务拆成节点(Node)边(Edge)。每个节点就是一个处理步骤,例如调用大模型、查数据库、执行代码……边则决定执行顺序,甚至可以加入条件分支、循环。

拿 stream‑node 来说,它的图简单到只有三个部分:

[START] → [expander 节点] → [END]

这就是一个线性流程:启动 → 调用大模型生成扩展 → 结束。用 mermaid 画出来:

START

expander
查询扩展

END

2.2 什么是流式输出?

传统的 API 调用是等 AI 把所有字都生成完后,一次性返回结果。流式输出则是生成一个字,就立刻推给前端一个字,就像打字机效果。这能极大降低用户等待的焦虑感。

在这个项目里,我们用 Spring WebFluxFlux 配合 Server‑Sent Events (SSE) 把流式数据推给浏览器或 App。


3. 架构一览

整个请求是怎么流动的呢?

通义千问 API ExpanderNode StateGraph GraphStreamController 客户端 通义千问 API ExpanderNode StateGraph GraphStreamController 客户端 GET /graph/stream/expand?query=如何学习Java compiledGraph.stream(input, config) 调用 apply(state) ChatClient.stream().chatResponse() Flux<token> 逐个 token 返回 Flux<ChatResponse> Flux<NodeOutput> SSE 事件流 (text/event-stream)

核心角色只有三个:

  • GraphStreamController:接收 HTTP 请求,启动流式执行,并把结果转成 SSE 格式。
  • StateGraph(图配置):定义节点和连接关系,管理运行时状态。
  • ExpanderNode:执行具体的 AI 逻辑,调用通义千问并返回 Flux<ChatResponse>

4. 代码精讲——每个细节

4.1 配置类:搭好舞台

@Configuration
public class GraphNodeStreamConfiguration {

    @Bean
    public StateGraph streamGraph(ChatClient.Builder chatClientBuilder) throws GraphStateException {

        // 1. 状态合并策略(如果多个节点写同一个字段该怎么办)
        KeyStrategyFactory keyStrategyFactory = new KeyStrategyFactoryBuilder()
                .addPatternStrategy("query", new ReplaceStrategy())
                .addPatternStrategy("expander_number", new ReplaceStrategy())
                .addPatternStrategy("expander_content", new ReplaceStrategy())
                .build();

        // 2. 构建图
        StateGraph stateGraph = new StateGraph(keyStrategyFactory)
                .addNode("expander", node_async(new ExpanderNode(chatClientBuilder)))
                .addEdge(START, "expander")
                .addEdge("expander", END);

        // 3. 打印 PlantUML 方便调试(在日志里可以看到图的样子)
        GraphRepresentation representation = stateGraph.getGraph(
                GraphRepresentation.Type.PLANTUML, "expander flow");
        log.info(representation.content());

        return stateGraph;
    }
}

为什么要设策略? 有时多个节点会写入同一个字段,比如后一个节点要追加内容而非覆盖。这里虽然是单节点,但显式声明 ReplaceStrategy 是最稳妥的——新值直接替换旧值。

💡 最佳实践:如果你刚开始搭建一个图,可以先用 new KeyStrategyFactoryBuilder().build() 采用默认策略,等流程复杂了再细化。


4.2 节点:大脑在这里工作

public class ExpanderNode implements NodeAction {

    private static final PromptTemplate DEFAULT_PROMPT = new PromptTemplate("""
        You are an expert in information retrieval.
        Generate {number} different versions of the given user query.
        Original query: {query}
        Each version should capture a slightly different angle or phrasing.
        Return the versions separated by new lines, without numbering.
        """);

    private final ChatClient chatClient;
    private final int defaultNumber = 3;

    public ExpanderNode(ChatClient.Builder builder) {
        this.chatClient = builder.build();
    }

    @Override
    public Map<String, Object> apply(OverAllState state) {
        // 从上游状态中读取参数
        String query = state.value("query").orElse("你好,很高兴认识你");
        int number = state.<Integer>value("expander_number").orElse(defaultNumber);

        // 流式调用大模型
        Flux<ChatResponse> flux = chatClient.prompt()
                .user(spec -> spec.text(DEFAULT_PROMPT.getTemplate())
                        .param("number", number)
                        .param("query", query))
                .stream()
                .chatResponse();

        // 将流作为新状态返回,框架会自动处理 SSE 串流
        return Map.of("expander_content", flux);
    }
}

设计亮点

  • OverAllState 中提取 queryexpander_number,完全解耦输入来源。
  • 返回值直接包含 Flux<ChatResponse>,框架会识别这是一个流,并在后续步骤中自动转换。
  • 提示词清晰、可独立修改;业务开发者可以只替换它实现不同功能。

⚠️ 生产化小建议

  • 提示词可以通过 @Value 从配置文件注入,而不是硬编码。
  • defaultNumber 可以变成可配置项,并给请求参数提供默认值。

4.3 控制器:流式数据的搬运工

@RestController
@RequestMapping("/graph/stream")
public class GraphStreamController {

    private final StateGraph compiledGraph;

    @GetMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<GraphProcess.ChatMessage>> expand(
            @RequestParam(defaultValue = "如何学习Java") String query,
            @RequestParam(defaultValue = "3") int expanderNumber,
            @RequestParam(defaultValue = "yingzi") String threadId) {

        // 1. 构建运行时配置(threadId 做会话隔离)
        RunnableConfig config = RunnableConfig.builder()
                .threadId(threadId)
                .build();

        // 2. 准备图输入
        Map<String, Object> inputs = Map.of(
                "query", query,
                "expander_number", expanderNumber
        );

        // 3. 启动流式执行,转换为 SSE 事件
        return compiledGraph.stream(inputs, config)
                .map(nodeOutput -> {
                    String content = nodeOutput.state().value("expander_content", "");
                    return ServerSentEvent.<GraphProcess.ChatMessage>builder()
                            .data(new GraphProcess.ChatMessage(content))
                            .build();
                });
    }
}

为什么用 threadId
图是有状态的,同一个 threadId 可以被多次调用,实现对话记忆或多轮交互。哪怕这里只有一轮,也提前预留了扩展能力。

🛠 可以改良的地方:生产环境建议去掉 threadId 的默认值,让调用方强制传入,或使用 UUID 自动生成,避免会话串扰。


5. 动手跑起来(部署指南)

5.1 本地开发 1 分钟启动

前提条件
  • JDK 17 或 21
  • Maven 3.9+
  • 阿里云 DashScope API Key(免费开通 有额度)
步骤
# 1. 克隆项目(假设已有)
cd spring-ai-alibaba-graph-example/stream-node

# 2. 设置 API Key(二选一)
# 方式 A:环境变量(推荐)
export AI_DASHSCOPE_API_KEY=sk-你的key

# 方式 B:直接改 application.yml(仅本地开发用,不要提交 Git)
# spring.ai.dashscope.api-key: sk-你的key

# 3. 启动
mvn spring-boot:run

看到 Started StreamNodeApplication 后,开另一个终端测试:

curl "http://localhost:8080/graph/stream/expand?query=如何学Python"

正常的话会逐行输出三句扩展结果。


5.2 生产部署(Systemd)- 最传统但也最稳

打包
mvn clean package -DskipTests
# 生成的 jar 在 target/stream-node-<version>.jar
创建 systemd 服务
sudo vim /etc/systemd/system/stream-node.service

内容:

[Unit]
Description=Spring AI Graph Stream Node
After=network.target

[Service]
Type=simple
User=ubuntu
WorkingDirectory=/opt/stream-node
ExecStart=/usr/bin/java -jar /opt/stream-node/stream-node-1.0.0.jar
Environment="AI_DASHSCOPE_API_KEY=sk-你的key"
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

启用并启动:

sudo systemctl daemon-reload
sudo systemctl start stream-node
sudo systemctl enable stream-node   # 开机自启

检查状态:

sudo systemctl status stream-node
curl "http://localhost:8080/graph/stream/expand?query=一键部署好玩吗"

5.3 Docker 部署——标准化交付

Dockerfile(多阶段构建,最终镜像仅含 JRE):

FROM maven:3.9-eclipse-temurin-21 AS builder
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN mvn clean package -DskipTests

FROM eclipse-temurin:21-jre-alpine
WORKDIR /app
COPY --from=builder /app/target/*.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]

构建并运行:

docker build -t stream-node:latest .
docker run -d --name stream-node -p 8080:8080 \
  -e AI_DASHSCOPE_API_KEY=sk-你的key \
  stream-node:latest

5.4 加入 Nginx 反向代理(生产推荐)

server {
    listen 80;
    server_name your-domain.com;

    location / {
        proxy_pass http://127.0.0.1:8080;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;

        # 🔥 流式场景关键配置
        proxy_buffering off;
        proxy_cache off;
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
    }
}

配置 HTTPS 证书(Let’s Encrypt):

sudo apt install certbot python3-certbot-nginx
sudo certbot --nginx -d your-domain.com
sudo certbot renew --dry-run   # 测试自动续期

6. 接口全览 & 玩法示例

请求方式 路径 Content‑Type
GET /graph/stream/expand text/event-stream

参数

参数 类型 默认值 说明
query String 如何学习Java 要扩展的原句
expander_number int 3 返回几个变体,建议 1-10
thread_id String yingzi 会话 ID(生产环境建议必填)

curl 命令

# 默认
curl http://localhost:8080/graph/stream/expand

# 自定义查询和数量
curl "http://localhost:8080/graph/stream/expand?query=微服务架构&expander_number=5"

# 指定会话
curl "http://localhost:8080/graph/stream/expand?query=高并发技巧&thread_id=user007"

返回的 SSE 流大概长这样:

data:{"content":"微服务架构最佳实践"}
data:{"content":"微服务与 Spring Cloud 实战"}
data:{"content":"如何从零搭建微服务体系"}
...

7. 从入门到魔改——扩展路线图

7.1 加一个节点,做“质量评估”

假设扩展结果有时候太水,可以加一个评估节点,给结果打分:

stateGraph
    .addNode("expander", node_async(new ExpanderNode(...)))
    .addNode("evaluator", node_async(new EvaluatorNode(...)))  // 新节点
    .addEdge(START, "expander")
    .addEdge("expander", "evaluator")
    .addEdge("evaluator", END);

对应的 mermaid 图变成:

START

expander

evaluator

END

7.2 加条件分支,让流程更聪明

如果评估节点发现自己不满意,可以重试扩展:

graph.addConditionalEdge("evaluator",
    state -> state.<String>value("score").orElse("low"),
    Map.of(
        "high", END,
        "low", "expander"  // 重新扩展
    )
);

score==high

score==low

START

expander

evaluator

END

7.3 改成 POST 请求支持长文本

@PostMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<GraphProcess.ChatMessage>> expandPost(
        @RequestBody ExpandRequest request) { ... }

8. 排坑手册

现象 可能原因 解决办法
启动报 API Key 错误 环境变量未生效 echo $AI_DASHSCOPE_API_KEY 后重启
端口 8080 被占用 已有其他程序 lsof -i:8080 找到并释放
外网无法访问 安全组/防火墙 云控制台开放 8080,sudo ufw allow 8080
SSE 连接自动断开 默认超时太短 application.yml 中配置 server.servlet.timeout.read: 3600s,Nginx 中设置 proxy_read_timeout 3600s
返回乱码 请求头缺失 确保 Accept: text/event-stream

9. 总结

stream‑node 看起来简单,却恰到好处地展示了图编排 + 流式 AI 输出的骨架。现在你不仅知道怎么用,还知道怎么改、怎么部署,完全可以把它作为脚手架,快速搭建自己的智能查询扩展、翻译润色、多路搜索等 Agent 应用。

接下来推荐继续探索:

  • chatflow:多轮对话 + 意图识别
  • parallel-stream-node:多个节点并行执行
  • human-node:人类审批介入的图流程

🧩 记住这个最小模板START → 一个 AI 节点 → END,后续所有复杂流程都只是这个模板的叠加与组合。

如果你跟着文章把服务成功跑起来,欢迎把第一句扩展结果截图在博文评论区晒一晒 😄

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐