【第40篇】Graph 极简入门:用 stream‑node 玩转流式查询扩展
在搜索框里输入“如何学 Java”,你希望系统能自动变出几个更精准的搜法,比如“Java 零基础自学指南”“Java 入门教程推荐”……这就是 Query Expansion(查询扩展)。
本文带你从零开始,既讲清楚背后的 Graph 流程框架,又手把手把服务跑起来。全程流式输出,就像 AI 一个字一个字往外蹦,体验丝滑!
1. 项目速览
stream‑node 是 Spring AI Alibaba Graph 示例里最友好的“Hello World”。它只用一个节点,就实现了带流式输出的查询扩展。
| 项目属性 | 说明 |
|---|---|
| 定位 | Graph 框架入门示例 + 流式处理基础 |
| 核心能力 | 将用户的一句话扩展成 N 个高质量搜索变体 |
| 难度 | ⭐(只要会 Spring Boot 就能玩) |
| 技术栈 | Spring Boot、Spring AI、Alibaba DashScope(通义千问) |
| 端口 | 8080 |
注:用户问一句话,AI 生成三个不同版本的回答,整个过程以 SSE 流式返回。
2. 两个关键概念
2.1 图编排(Graph)
在 Spring AI Alibaba Graph 里,我们把一个 AI 任务拆成节点(Node)和边(Edge)。每个节点就是一个处理步骤,例如调用大模型、查数据库、执行代码……边则决定执行顺序,甚至可以加入条件分支、循环。
拿 stream‑node 来说,它的图简单到只有三个部分:
[START] → [expander 节点] → [END]
这就是一个线性流程:启动 → 调用大模型生成扩展 → 结束。用 mermaid 画出来:
2.2 什么是流式输出?
传统的 API 调用是等 AI 把所有字都生成完后,一次性返回结果。流式输出则是生成一个字,就立刻推给前端一个字,就像打字机效果。这能极大降低用户等待的焦虑感。
在这个项目里,我们用 Spring WebFlux 的 Flux 配合 Server‑Sent Events (SSE) 把流式数据推给浏览器或 App。
3. 架构一览
整个请求是怎么流动的呢?
核心角色只有三个:
- GraphStreamController:接收 HTTP 请求,启动流式执行,并把结果转成 SSE 格式。
- StateGraph(图配置):定义节点和连接关系,管理运行时状态。
- ExpanderNode:执行具体的 AI 逻辑,调用通义千问并返回
Flux<ChatResponse>。
4. 代码精讲——每个细节
4.1 配置类:搭好舞台
@Configuration
public class GraphNodeStreamConfiguration {
@Bean
public StateGraph streamGraph(ChatClient.Builder chatClientBuilder) throws GraphStateException {
// 1. 状态合并策略(如果多个节点写同一个字段该怎么办)
KeyStrategyFactory keyStrategyFactory = new KeyStrategyFactoryBuilder()
.addPatternStrategy("query", new ReplaceStrategy())
.addPatternStrategy("expander_number", new ReplaceStrategy())
.addPatternStrategy("expander_content", new ReplaceStrategy())
.build();
// 2. 构建图
StateGraph stateGraph = new StateGraph(keyStrategyFactory)
.addNode("expander", node_async(new ExpanderNode(chatClientBuilder)))
.addEdge(START, "expander")
.addEdge("expander", END);
// 3. 打印 PlantUML 方便调试(在日志里可以看到图的样子)
GraphRepresentation representation = stateGraph.getGraph(
GraphRepresentation.Type.PLANTUML, "expander flow");
log.info(representation.content());
return stateGraph;
}
}
为什么要设策略? 有时多个节点会写入同一个字段,比如后一个节点要追加内容而非覆盖。这里虽然是单节点,但显式声明 ReplaceStrategy 是最稳妥的——新值直接替换旧值。
💡 最佳实践:如果你刚开始搭建一个图,可以先用
new KeyStrategyFactoryBuilder().build()采用默认策略,等流程复杂了再细化。
4.2 节点:大脑在这里工作
public class ExpanderNode implements NodeAction {
private static final PromptTemplate DEFAULT_PROMPT = new PromptTemplate("""
You are an expert in information retrieval.
Generate {number} different versions of the given user query.
Original query: {query}
Each version should capture a slightly different angle or phrasing.
Return the versions separated by new lines, without numbering.
""");
private final ChatClient chatClient;
private final int defaultNumber = 3;
public ExpanderNode(ChatClient.Builder builder) {
this.chatClient = builder.build();
}
@Override
public Map<String, Object> apply(OverAllState state) {
// 从上游状态中读取参数
String query = state.value("query").orElse("你好,很高兴认识你");
int number = state.<Integer>value("expander_number").orElse(defaultNumber);
// 流式调用大模型
Flux<ChatResponse> flux = chatClient.prompt()
.user(spec -> spec.text(DEFAULT_PROMPT.getTemplate())
.param("number", number)
.param("query", query))
.stream()
.chatResponse();
// 将流作为新状态返回,框架会自动处理 SSE 串流
return Map.of("expander_content", flux);
}
}
设计亮点:
- 从
OverAllState中提取query和expander_number,完全解耦输入来源。 - 返回值直接包含
Flux<ChatResponse>,框架会识别这是一个流,并在后续步骤中自动转换。 - 提示词清晰、可独立修改;业务开发者可以只替换它实现不同功能。
⚠️ 生产化小建议:
- 提示词可以通过
@Value从配置文件注入,而不是硬编码。defaultNumber可以变成可配置项,并给请求参数提供默认值。
4.3 控制器:流式数据的搬运工
@RestController
@RequestMapping("/graph/stream")
public class GraphStreamController {
private final StateGraph compiledGraph;
@GetMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<GraphProcess.ChatMessage>> expand(
@RequestParam(defaultValue = "如何学习Java") String query,
@RequestParam(defaultValue = "3") int expanderNumber,
@RequestParam(defaultValue = "yingzi") String threadId) {
// 1. 构建运行时配置(threadId 做会话隔离)
RunnableConfig config = RunnableConfig.builder()
.threadId(threadId)
.build();
// 2. 准备图输入
Map<String, Object> inputs = Map.of(
"query", query,
"expander_number", expanderNumber
);
// 3. 启动流式执行,转换为 SSE 事件
return compiledGraph.stream(inputs, config)
.map(nodeOutput -> {
String content = nodeOutput.state().value("expander_content", "");
return ServerSentEvent.<GraphProcess.ChatMessage>builder()
.data(new GraphProcess.ChatMessage(content))
.build();
});
}
}
为什么用 threadId?
图是有状态的,同一个 threadId 可以被多次调用,实现对话记忆或多轮交互。哪怕这里只有一轮,也提前预留了扩展能力。
🛠 可以改良的地方:生产环境建议去掉
threadId的默认值,让调用方强制传入,或使用UUID自动生成,避免会话串扰。
5. 动手跑起来(部署指南)
5.1 本地开发 1 分钟启动
前提条件
- JDK 17 或 21
- Maven 3.9+
- 阿里云 DashScope API Key(免费开通 有额度)
步骤
# 1. 克隆项目(假设已有)
cd spring-ai-alibaba-graph-example/stream-node
# 2. 设置 API Key(二选一)
# 方式 A:环境变量(推荐)
export AI_DASHSCOPE_API_KEY=sk-你的key
# 方式 B:直接改 application.yml(仅本地开发用,不要提交 Git)
# spring.ai.dashscope.api-key: sk-你的key
# 3. 启动
mvn spring-boot:run
看到 Started StreamNodeApplication 后,开另一个终端测试:
curl "http://localhost:8080/graph/stream/expand?query=如何学Python"
正常的话会逐行输出三句扩展结果。
5.2 生产部署(Systemd)- 最传统但也最稳
打包
mvn clean package -DskipTests
# 生成的 jar 在 target/stream-node-<version>.jar
创建 systemd 服务
sudo vim /etc/systemd/system/stream-node.service
内容:
[Unit]
Description=Spring AI Graph Stream Node
After=network.target
[Service]
Type=simple
User=ubuntu
WorkingDirectory=/opt/stream-node
ExecStart=/usr/bin/java -jar /opt/stream-node/stream-node-1.0.0.jar
Environment="AI_DASHSCOPE_API_KEY=sk-你的key"
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
启用并启动:
sudo systemctl daemon-reload
sudo systemctl start stream-node
sudo systemctl enable stream-node # 开机自启
检查状态:
sudo systemctl status stream-node
curl "http://localhost:8080/graph/stream/expand?query=一键部署好玩吗"
5.3 Docker 部署——标准化交付
Dockerfile(多阶段构建,最终镜像仅含 JRE):
FROM maven:3.9-eclipse-temurin-21 AS builder
WORKDIR /app
COPY pom.xml .
COPY src ./src
RUN mvn clean package -DskipTests
FROM eclipse-temurin:21-jre-alpine
WORKDIR /app
COPY --from=builder /app/target/*.jar app.jar
EXPOSE 8080
ENTRYPOINT ["java", "-jar", "app.jar"]
构建并运行:
docker build -t stream-node:latest .
docker run -d --name stream-node -p 8080:8080 \
-e AI_DASHSCOPE_API_KEY=sk-你的key \
stream-node:latest
5.4 加入 Nginx 反向代理(生产推荐)
server {
listen 80;
server_name your-domain.com;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# 🔥 流式场景关键配置
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}
配置 HTTPS 证书(Let’s Encrypt):
sudo apt install certbot python3-certbot-nginx
sudo certbot --nginx -d your-domain.com
sudo certbot renew --dry-run # 测试自动续期
6. 接口全览 & 玩法示例
| 请求方式 | 路径 | Content‑Type |
|---|---|---|
| GET | /graph/stream/expand |
text/event-stream |
参数:
| 参数 | 类型 | 默认值 | 说明 |
|---|---|---|---|
| query | String | 如何学习Java | 要扩展的原句 |
| expander_number | int | 3 | 返回几个变体,建议 1-10 |
| thread_id | String | yingzi | 会话 ID(生产环境建议必填) |
curl 命令:
# 默认
curl http://localhost:8080/graph/stream/expand
# 自定义查询和数量
curl "http://localhost:8080/graph/stream/expand?query=微服务架构&expander_number=5"
# 指定会话
curl "http://localhost:8080/graph/stream/expand?query=高并发技巧&thread_id=user007"
返回的 SSE 流大概长这样:
data:{"content":"微服务架构最佳实践"}
data:{"content":"微服务与 Spring Cloud 实战"}
data:{"content":"如何从零搭建微服务体系"}
...
7. 从入门到魔改——扩展路线图
7.1 加一个节点,做“质量评估”
假设扩展结果有时候太水,可以加一个评估节点,给结果打分:
stateGraph
.addNode("expander", node_async(new ExpanderNode(...)))
.addNode("evaluator", node_async(new EvaluatorNode(...))) // 新节点
.addEdge(START, "expander")
.addEdge("expander", "evaluator")
.addEdge("evaluator", END);
对应的 mermaid 图变成:
7.2 加条件分支,让流程更聪明
如果评估节点发现自己不满意,可以重试扩展:
graph.addConditionalEdge("evaluator",
state -> state.<String>value("score").orElse("low"),
Map.of(
"high", END,
"low", "expander" // 重新扩展
)
);
7.3 改成 POST 请求支持长文本
@PostMapping(value = "/expand", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<GraphProcess.ChatMessage>> expandPost(
@RequestBody ExpandRequest request) { ... }
8. 排坑手册
| 现象 | 可能原因 | 解决办法 |
|---|---|---|
| 启动报 API Key 错误 | 环境变量未生效 | echo $AI_DASHSCOPE_API_KEY 后重启 |
| 端口 8080 被占用 | 已有其他程序 | lsof -i:8080 找到并释放 |
| 外网无法访问 | 安全组/防火墙 | 云控制台开放 8080,sudo ufw allow 8080 |
| SSE 连接自动断开 | 默认超时太短 | 在 application.yml 中配置 server.servlet.timeout.read: 3600s,Nginx 中设置 proxy_read_timeout 3600s |
| 返回乱码 | 请求头缺失 | 确保 Accept: text/event-stream |
9. 总结
stream‑node 看起来简单,却恰到好处地展示了图编排 + 流式 AI 输出的骨架。现在你不仅知道怎么用,还知道怎么改、怎么部署,完全可以把它作为脚手架,快速搭建自己的智能查询扩展、翻译润色、多路搜索等 Agent 应用。
接下来推荐继续探索:
- chatflow:多轮对话 + 意图识别
- parallel-stream-node:多个节点并行执行
- human-node:人类审批介入的图流程
🧩 记住这个最小模板:
START → 一个 AI 节点 → END,后续所有复杂流程都只是这个模板的叠加与组合。
如果你跟着文章把服务成功跑起来,欢迎把第一句扩展结果截图在博文评论区晒一晒 😄
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)