AI 服务化是指将原本只能‏‏‏‏‏‏‏‏‏‏‏‏‏‏本地运行的 AI 能力转化为可远程调用的接口服务,‍‍‍‍‍‍‍‍‍‍‍‍‍‍使更多人能够便捷地访问 AI 能力

首先我们要知道 AI 应用接口的一个特点:SSE

就是我们平时开发的大多数接口都‏‏‏‏‏‏‏‏‏‏‏‏‏‏是同步接口,也就是等后端处理完再返回。但是对于 A‍‍‍‍‍‍‍‍‍‍‍‍‍‍I 应用,特别是响应时间较长的对话类应用,可能会让‎‎‎‎‎‎‎‎‎‎‎‎‎‎用户失去耐心等待,因此推荐使用 SSE(Serve‏‏‏‏‏‏‏‏‏‏‏‏‏‏r-Sent Events)技术实现实时流式输出,⁡⁡⁡⁡⁡⁡⁡⁡⁡⁡⁡⁡⁡⁡类似打字机效果,大幅提升用户体验

在几乎所有的 AI 应用网页中,你都会注意的 AI 的输出都是像打字一样一点一点呈现在你面前的。

在前面的文档中我们都是用的同步接口(一次‎‎‎‎‎‎‎‎‎‎‎性完‎整返‎回),接下来我们‏‏‏‏‏‏‏‏‏‏‏‏‏‏会同时提供同‍‍‍‍‍‍‍‍‍‍‍步‍接‍口‍‎‏‏‏‏‏‏‏‏‏‏‏和基于‏ SS‏E⁡⁡⁡⁡⁡⁡⁡⁡⁡⁡⁡ 的‏流式输⁡出接⁡口

支持流式调用的接口的实现

1.通过 stream‏‏‏ 方法就可以返回 F⁡⁡⁡lux 响应式对象了:
public Flux<String> doChatByStream(String message, String chatId) {
    return chatClient
            .prompt()
            .user(message)
            .advisors(spec -> spec.param(CHAT_MEMORY_CONVERSATION_ID_KEY, chatId)
                    .param(CHAT_MEMORY_RETRIEVE_SIZE_KEY, 10))
            .stream()
            .content();
}

建议不要直接‏‏‏使用 ChatResponse 作‍‍‍为返回类型,因为这会导致返回内容膨‎‎‎胀,影响传输效率。所以上述代码中我‏‏‏们使用 content 方法,只返⁡⁡⁡回 AI 输出的文本信息

2.然后编写基‏‏‏于 SSE 的‍流‍式‍输出接口
a.返回‏‏‏ Flux 响‍应‍式‍对象,并且‎添加‎ S‎SE ‏对应的‏ Me‏d⁡iaTy⁡pe:
@GetMapping(value = "/app/chat/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> doChatWithAppSSE(String message, String chatId) {
    return loveApp.doChatByStream(message, chatId);
}

produces参数的作用:表明该接口将返回Server-Sent Events (SSE) 格式的数据,

  • 浏览器的 EventSource API 需要正确的 Content-Type: text/event-stream 头
  • 如果没有这个头,浏览器可能不会将其识别为 SSE 连接
b.使用 ‏‏‏SSEEmiter,‍‍‍通过 send 方法‎‎‎持续向 SseEmi‏‏‏tter 发送消息(⁡⁡⁡有点像 IO 操作):
@GetMapping("/app/chat/sse/emitter")
public SseEmitter doChatWithAppSseEmitter(String message, String chatId) {
    // 创建一个超时时间较长的 SseEmitter
    SseEmitter emitter = new SseEmitter(180000L); // 3分钟超时
    // 获取 Flux 数据流并直接订阅
    loveApp.doChatByStream(message, chatId)
            .subscribe(
                    // 处理每条消息
                    chunk -> {
                        try {
                            emitter.send(chunk);
                        } catch (IOException e) {
                            emitter.completeWithError(e);
                        }
                    },
                    // 处理错误
                    emitter::completeWithError,
                    // 处理完成
                    emitter::complete
            );
    // 返回emitter
    return emitter;
}

这里补充一下:SseEmitter 是 Spring MVC 里专门用来做 SSE 推送 的一个类

一个“服务器往客户端持续发消息”的通道对象

普通接口一般是:

  1. 方法执行完
  2. 返回一个结果
  3. 请求结束

但 SseEmitter 不一样:

  1. 先建立连接
  2. 后端可以不断 send(...)
  3. 客户端会持续收到消息
  4. 最后再 complete()

比如:

SseEmitter emitter = new SseEmitter(180000L); //创建一个 SSE 连接,超时 3 分钟 
emitter.send("第一段"); //发第一条消息
emitter.send("第二段"); //再发第二条消息
emitter.complete(); //发完后关闭连接

用到它是因为智能体执行过程常常是:

  1. 第一步调用工具结果出来
  2. 推给前端
  3. 第二步又调用一次结果出来
  4. 再推给前端
  5. 出错了也可以立刻推错误消息

还有.subscribe(...) 方法意思可以理解成:

真正开始消费这条流,并且规定“每来一段数据、报错、结束时该怎么处理”

因为这里要将流中的每一块都给到SseEmitter去一个个处理,而Flux 直接返回给前端时不用自己写 subscribe()是因为:Spring Web 层 / HTTP 响应处理器替你订阅了(有新数据时,Spring 自动往客户端写)

还要补充:后端支持跨域

为了让前端‏‏‏项目能够顺利调用后端‍‍‍接口,我们需要在后端‎‎‎配置跨域支持

/**
全局跨域配置
 */
@Configuration
public class CorsConfig implements WebMvcConfigurer {
 @Override

 public void addCorsMappings(CorsRegistry registry) {
 // 覆盖所有请求
 registry.addMapping("/**")
         // 允许发送 Cookie
         .allowCredentials(true)
         // 放行哪些域名(必须用 patterns,否则 * 会和 allowCredentials 冲突)
         .allowedOriginPatterns("")
         .allowedMethods("GET", "POST", "PUT", "DELETE", "OPTIONS")
         .allowedHeaders("") //允许前端带任意请求头
         .exposedHeaders("*"); // 允许前端 JS 读取响应头里的字段
 }
}

为什么这样就能跨域?

浏览器默认会拦跨域请求

当前端从比如:

http://localhost:3000 去请求:

http://localhost:8123/api/... 这两个源不同,浏览器就会先检查后端有没有明确说“我允许你跨域访问”

这个 CorsConfig 的作用就是让 Spring 在响应里加上类似这些 CORS 响应头

浏览器看到这些头,才会放行

到这里我们就可以进行部署了

额外补充(了解):响应式编程

响应式编程简单来说就是在程序的结果处理上面不同

我们先来看传统对结果的处理:

String result = callApi();
System.out.println(result);

意思是:

先调用

卡住等结果

等到了再往下走

这叫比较典型的同步 / 阻塞式思路

然而有个问题是:现实里很多事都不是“一次就拿到完整结果”的,像:

  • 网络请求可能慢
  • 聊天消息可能一条条来
  • SSE 会持续推送
  • 大模型会一段段生成
  • 用户点击、输入也是不断发生的事件流

如果还全都按“先堵住等结果”去写,会比较笨重,而响应式思路更像是:

有结果来了 -> 我处理它

又来一个结果 -> 我再处理它

报错了 -> 我处理错误

结束了 -> 我收尾

数据一点一点输出,我一点一点处理

那我们是怎么做响应式编程的呢?

对于Java,我们用Reactor

Reactor 是Java中一个用来表示和处理“异步数据流”的工具库

它最常见的两个类型是:

  • Mono 单个异步结果
  • Flux 多个异步结果组成的流

我们先看普通 List / String

List<string> list = Arrays.asList("a", "b", "c");

你是立刻拿到了整批数据

而Flux只是拿到了一条“未来会陆续产生字符串的流”

flux.map(x -> x + "!")

这不是说现在立刻把所有数据都处理完了, 而更像是在说:以后每来一个数据,就按这个规则处理一下

所以他特别适合流式输出,比如大模型流式生成时:

先吐一小段文本

再吐一小段

再吐一小段

最后结束

这比你硬等一个完整 String 更贴切,我们可以将Mono和Flux理解成异步等待,来结果再执行处理

接着我们再来说说SSE 这种传输方式

首先我们要明白:

Flux 只是在 Java / Spring 后端内部表示“这是一条流”, 但浏览器和 HTTP 客户端并不知道你这个返回值想表达什么协议语义。 要让“流式输出”真正通过 HTTP 持续发给前端,就需要 SSE 这种传输方式。

浏览器访问的是 HTTP

HTTP 客户端要知道: 这次响应是普通文本?JSON?文件下载?还是持续不断的事件流?

Flux 自己不会告诉浏览器这个协议层语义

所以SSE 解决的是“怎么通过 HTTP 把流持续推给前端”

你可以把SSE(Server-Sent Events)理解成:

一种专门用来让服务器持续往浏览器推送消息的 HTTP 方式

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐