站内本项目源码和程序资源下载地址

第一章:概述

1.1 Stream-Node

stream-node 是一个基于 Spring BootSpring AI Alibaba 的示例项目,核心目标是演示 Graph Stream(图流) 能力。

简单来说,它展示了如何在 Spring AI 中:

  • 以**流式(Streaming)**方式调用 AI 大模型
  • 通过节点编排组织复杂的 AI 处理流程
  • 借助 Server-Sent Events (SSE) 将 AI 生成的结果实时推送给客户端

在这里插入图片描述

1.2 核心概念图解

用户请求

StateGraph
状态图编排

ExpanderNode
扩展节点

AI 大模型
DashScope / OpenAI

流式响应
StreamingOutput

GraphProcess
消息转换

SSE 推送
ServerSentEvent

客户端实时渲染

概念说明:

概念 通俗解释
StateGraph AI 工作流的"导演",负责定义哪些节点按什么顺序执行,以及如何传递状态数据
ExpanderNode 具体的"演员",负责调用 AI 模型对内容进行扩写/生成
StreamingOutput AI 模型返回的"片段流",不是等全部内容生成完再返回,而是生成一点就推送一点
SSE 浏览器/客户端与服务器之间的一种单向实时通信协议,特别适合这种"逐字显示"的场景

第二章:技术架构与运行原理

2.1 技术栈

分类 技术/组件 版本/说明
基础框架 Spring Boot 3.4.x
Maven
Java 17
AI 层 Spring AI Alibaba 1.1.2.3
DashScope(阿里云模型)
OpenAI 兼容接口
响应式层 Project Reactor
Flux 流式处理
Sinks.Many 背压控制
传输层 SSE text/event-stream
HTTP/1.1 长连接

2.2 SSE

传统的 HTTP 请求是"一问一答"模式:客户端发送请求后,服务器处理完毕才一次性返回完整响应。但 AI 大模型生成内容较慢,如果等全部生成完再返回,用户会看到长时间的"白屏等待"。

SSE 流式方案的工作机制:

GraphProcess AI 大模型 ExpanderNode StateGraph GraphStreamController GraphProcess AI 大模型 ExpanderNode StateGraph GraphStreamController loop [节点编排执行] 客户端收到每一段就立即渲染 实现"打字机效果" 用户/客户端 GET /graph/stream/expand query=介绍Spring AI 启动图执行流程 调用 expander 节点 发送 Prompt 请求 返回 Flux<<StreamingOutput> (流式片段) 逐段输出 NodeOutput 转换为 ChatMessage 写入 Sinks.Many 通道 SSE: {"node_name":"expander","type":"<chunk>"} SSE: {"node_name":"expander","type":"<chunk>"} SSE: ...(持续推送) 用户/客户端

关键原理:

  • Flux:Reactor 提供的响应式流,可以表示 0 到 N 个元素的异步序列
  • Sinks.Many:一个多播的"数据广播站",允许多个订阅者同时接收流式数据
  • 背压(Backpressure):当客户端处理速度跟不上 AI 生成速度时,Reactor 会自动协调,防止内存溢出

第三章:快速开始

3.1 环境准备

依赖项 版本/要求 检查命令
Java 17+ java -version
Maven 3.6+ mvn -v
网络 可访问 DashScope 或 OpenAI curl dashscope.aliyuncs.com

配置 API 密钥(二选一):

# 方案 A:阿里云 DashScope(推荐,国内访问稳定)
export AI_DASHSCOPE_API_KEY=your_dashscope_key

# 方案 B:OpenAI 兼容接口(用于本地调试)
export AI_OPENAI_API_KEY=your_openai_key

3.2 构建与运行

# 进入项目目录
cd stream-node

# 编译打包
mvn clean package

# 运行服务(默认 8080 端口)
java -jar target/stream-node-0.0.1-SNAPSHOT.jar

启动成功后,控制台会打印 PlantUML 格式的图结构描述,你可以在 IDE 中预览节点流向。


第四章:接口详解

4.1 核心接口

GET /graph/stream/expand

参数 类型 必填 默认值 说明
query string "你好,很高兴认识你,能简单介绍一下自己吗?" 用户输入的原始问题
expander_number int 3 扩展节点调用次数
thread_id string "yingzi" 会话标识,用于状态隔离

4.2 请求-响应交互流程

AI 模型 ExpanderNode StateGraph GraphStreamController 客户端(curl/浏览器) AI 模型 ExpanderNode StateGraph GraphStreamController 客户端(curl/浏览器) 图编排阶段 实时推送阶段 loop [AI 逐 token 生成] GET /graph/stream/expand?query=... 构建并执行 StateGraph 激活 expander 节点 发起流式调用 返回文本片段 NodeOutput(chunk) SSE: data: {"node_name":"expander",...} 逐段渲染,实现打字机效果

4.3 实际调用示例

curl "http://localhost:8080/graph/stream/expand?query=请介绍一下Spring AI&expander_number=2"

客户端收到的 SSE 数据流:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Transfer-Encoding: chunked

data: {"node_name":"expander","type":"<chunk>","content":"Spring"}

data: {"node_name":"expander","type":"<chunk>","content":" AI"}

data: {"node_name":"expander","type":"<chunk>","content":" 是一个"}

data: {"node_name":"expander","type":"<chunk>","content":" 用于..."}

...(直到内容生成完毕)

第五章:核心代码剖析

5.1 类职责与协作关系

配置

调度

输出

触发

订阅

GraphNodeStreamConfiguration

职责: 定义图结构,注册节点,配置状态策略

+StateGraph stateGraph()

+void configure()

GraphStreamController

职责: 暴露 HTTP 接口,转发 SSE 流

+Flux<ServerSentEvent> expand()

GraphProcess

职责: 统一消息格式,管理数据通道

+ChatMessage convert(NodeOutput)

+void writeToSink(Sinks.Many)

ExpanderNode

职责: 调用 AI 模型执行内容扩写

+Flux<StreamingOutput> call()

StateGraph

职责: 编排节点执行顺序与状态流转

+void addNode()

+void addEdge()

5.2 节点编排原理:StateGraph 的工作方式

StateGraph 是 Spring AI Alibaba Graph 模块的核心。你可以把它理解为一个状态机 + 工作流引擎

流程结束

触发条件满足

输出写入 State

初始化状态

ExpanderNode

收到 token

持续生成

生成完毕

调用AI

流式输出

状态更新

expander_number 决定循环次数:
默认执行 3 次扩展

键值策略(ReplaceStrategy)说明:

GraphNodeStreamConfiguration 中配置的状态策略,决定了当多个节点产生同名状态时,如何处理冲突:

  • ReplaceStrategy:新值直接覆盖旧值,确保 State 中始终保存最新结果

第六章:配置指南

6.1 application.yml 详解

server:
  port: 8080              # 服务端口,冲突时可修改
  servlet:
    encoding:
      charset: UTF-8      # 强制 UTF-8 编码
      enabled: true
      force: true         # 强制响应使用 UTF-8,避免中文乱码

spring:
  ai:
    openai:
      api-key: ${AI_OPENAI_API_KEY}                    # 从环境变量读取
      base-url: https://integrate.api.nvidia.com/v1    # OpenAI 兼容端点
      chat:
        options:
          model: nvidia/nemotron-3-nano-omni-30b-a3b-reasoning  # 默认模型
    
    # 禁用智谱 AI 自动配置,避免冲突
    zhipu:
      enabled: false
  
  autoconfigure:
    exclude: org.springframework.ai.autoconfigure.zhipuai.ZhiPuAiAutoConfiguration

6.2 配置优先级

API密钥配置优先级

优先级1

优先级2

禁止使用

🔐 环境变量配置
AI_DASHSCOPE_API_KEY

⚙️ 应用配置文件
application.yml
${AI_OPENAI_API_KEY}

❌ 代码硬编码
(不推荐)

运行时配置

推荐做法:

  1. 生产环境:使用 AI_DASHSCOPE_API_KEY 环境变量
  2. 本地调试:使用 AI_OPENAI_API_KEY 环境变量
  3. 永远不要将密钥提交到 Git

第七章:测试验证

7.1 使用 IDE 测试

项目根目录包含 stream-node.http 文件,支持直接在 IntelliJ IDEA 或 VS Code 中运行:

### Expand 示例
GET http://localhost:8080/graph/stream/expand?query=介绍一下Spring AI&expander_number=2
Accept: text/event-stream

7.2 使用命令行测试

# curl 方式
curl -N "http://localhost:8080/graph/stream/expand?query=你好"

# httpie 方式(更美观的 SSE 输出)
http --stream GET "http://localhost:8080/graph/stream/expand" query=="你好"

在这里插入图片描述

7.3 浏览器测试

直接访问接口地址,或使用支持 SSE 的调试工具(如 Postman、Apifox),观察事件流:

SSE 通信流程

📤 请求头
Accept: text/event-stream

📥 数据流 1
data: {message}

📥 数据流 2
data: {message}

✅ 结束标识
data: [DONE]

🌐 客户端请求
浏览器 / Postman

⚙️ SSE 接口
服务器端点

在这里插入图片描述


第八章:注意事项与最佳实践

8.1 安全红线

风险 后果 防护措施
密钥泄露 被盗刷 API 额度 使用环境变量,加入 .gitignore
明文存储 代码仓库暴露 启用 secret 扫描(GitHub/GitLab)

8.2 部署 checklist

部署前检查

端口 8080 是否占用?

修改 server.port

网络能否访问 AI 服务端点?

配置代理或切换内网模型

API 密钥是否配置?

设置环境变量

启动服务

8.3 计费与配额提醒

  • DashScope:按 token 数量计费,请关注阿里云控制台余额与配额
  • OpenAI/NVIDIA:按调用次数或 token 计费,注意模型价格差异
  • 建议:生产环境配置用量告警,避免意外高额账单

8.4 编码问题排查

如果客户端出现中文乱码,请确认:

  1. application.ymlforce: true 已启用
  2. 客户端以 UTF-8 解析响应
  3. 请求头包含 Accept: text/event-stream

附录:完整请求-响应示例

请求:

curl -N "http://localhost:8080/graph/stream/expand?query=什么是Graph Stream&expander_number=1"

SSE 响应流:

HTTP/1.1 200 
Content-Type: text/event-stream
X-Accel-Buffering: no

data: {"node_name":"expander","type":"<chunk>","content":"Graph"}

data: {"node_name":"expander","type":"<chunk>","content":" Stream"}

data: {"node_name":"expander","type":"<chunk>","content":" 是 Spring AI"}

data: {"node_name":"expander","type":"<chunk>","content":" Alibaba 提供的"}

data: {"node_name":"expander","type":"<chunk>","content":" 图流式编排能力..."}

data: {"node_name":"expander","type":"<end>"}

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐