Spring Boot与FastAPI的桥接通信方案
·
一、架构概述
"问康"项目采用双微服务架构。本次开发完成了Spring Boot与FastAPI之间的完整桥接通信实现,实现了业务逻辑与AI能力的解耦:
- Spring Boot:主后端服务,处理业务逻辑、用户认证、数据持久化
- FastAPI:AI模型推理服务,提供AI问诊、OCR识别、导航推荐等能力
两个服务通过HTTP协议进行桥接通信,实现了业务逻辑与AI能力的解耦。
二、桥接通信模式设计
项目设计了三种桥接通信模式:
1. 触发-回调模式(Trigger-Callback Pattern)
本次开发在集成模块新增了完整的触发-回调通信机制,支持异步响应处理:
public Map<String, Object> triggerFastApi(BridgeTriggerRequest request) {
String traceId = UUID.randomUUID().toString().replace("-", "");
BridgePayload payload = new BridgePayload(
traceId,
"springboot",
request.message(),
springbootBaseUrl + "/integration/callback-from-fastapi",
safeMetadata(request.metadata(), "triggered_by", "springboot")
);
// 发送请求到FastAPI
Map<String, Object> fastApiResponse = postJson(
fastApiBaseUrl + "/integration/receive-from-springboot",
payload
);
return Map.of(
"status", "success",
"initiator", "springboot",
"trace_id", traceId,
"fastapi_response", fastApiResponse
);
}
设计要点:
- Trace ID机制:每个请求生成唯一的traceId,实现全链路追踪
- 回调URL传递:在payload中携带callbackUrl,支持异步响应处理
- 元数据扩展:通过metadata字段传递额外上下文信息
2. 直接响应模式(Direct Response Pattern)
在直接响应模式中,系统支持主动发起回调请求:
public Map<String, Object> receiveFromFastApi(BridgePayload payload) {
// 如果需要回调,主动发起回调请求
Map<String, Object> callbackResponse = null;
if (StringUtils.hasText(payload.callbackUrl())) {
BridgePayload callbackPayload = new BridgePayload(
payload.traceId(),
"springboot",
"Spring Boot callback acknowledged trace " + payload.traceId(),
null,
safeMetadata(payload.metadata(), "callback_from", "springboot")
);
callbackResponse = postJson(payload.callbackUrl(), callbackPayload);
}
return Map.of(
"status", "received",
"receiver", "springboot",
"trace_id", payload.traceId(),
"callback_response", callbackResponse
);
}
3. 回调接收模式(Callback Receive Pattern)
回调接收模式用于处理FastAPI的异步回调:
public Map<String, Object> receiveCallbackFromFastApi(BridgePayload payload) {
return Map.of(
"status", "callback_received",
"receiver", "springboot",
"trace_id", payload.traceId(),
"message", payload.message(),
"metadata", safeMetadata(payload.metadata())
);
}
三、SSE流式通信实现
咨询对话的SSE流式处理
本次开发在SSE流式通信模块新增了完整的流式处理逻辑,通过Java HttpClient原生支持SSE协议:
public ConsultStreamHandle openConsultStream(Long messageId, String sessionId, Long userId) {
String traceId = UUID.randomUUID().toString().replace("-", "");
Map<String, Object> payload = buildConsultPayload(
traceId,
messageId,
sessionId,
userId,
true, // 启用流式响应
"springboot_stream"
);
HttpRequest request = HttpRequest.newBuilder(URI.create(fastApiBaseUrl + "/consult"))
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.header("Accept-Encoding", "identity")
.timeout(Duration.ofSeconds(consultStreamTimeoutSeconds)) // 默认30分钟超时
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
HttpResponse<InputStream> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofInputStream()
);
return new ConsultStreamHandle(traceId, response.body());
}
Controller层的SSE流式转发
本次开发在Controller层新增了完整的SSE流式转发逻辑:
@PostMapping(value = "/sessions/{sessionId}/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<StreamingResponseBody> chatStream(...) {
IntegrationHttpService.ConsultStreamHandle streamHandle =
integrationHttpService.openConsultStream(...);
StreamingResponseBody streamingBody = outputStream -> {
// 先发送accepted事件
writeSseEvent(outputStream, "accepted", acceptedJson);
// 实时转发FastAPI的SSE流
try (InputStream inputStream = streamHandle.stream()) {
byte[] buffer = new byte[8192];
int length;
while ((length = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, length);
outputStream.flush(); // 确保实时推送
}
}
};
return ResponseEntity.ok()
.contentType(MediaType.TEXT_EVENT_STREAM)
.header("X-Accel-Buffering", "no") // 禁用代理缓冲
.body(streamingBody);
}
关键技术点:
- 使用Java 11的HttpClient原生支持SSE
- 通过StreamingResponseBody实现响应流转发
- 设置X-Accel-Buffering: no防止反向代理缓存
- 8KB缓冲区平衡性能与内存占用
四、超时与容错策略
本次开发在超时控制模块新增了完整的容错机制,通过最小超时保护防止配置错误:
public IntegrationHttpService(...) {
this.consultStreamTimeoutSeconds = Math.max(60, consultStreamTimeoutSeconds);
this.navigationTimeoutSeconds = Math.max(3, navigationTimeoutSeconds);
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10)) // 连接超时10秒
.version(HttpClient.Version.HTTP_1_1)
.build();
}
private Map<String, Object> postJsonWithTimeout(String url, Object payload, long timeoutSeconds) {
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.header("Content-Type", "application/json")
.timeout(Duration.ofSeconds(Math.max(3, timeoutSeconds)))
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
HttpResponse<String> response = httpClient.send(request, ...);
if (response.statusCode() >= 400) {
throw new IllegalStateException(
"Request to " + url + " failed with " + response.statusCode()
);
}
}
超时配置:
- 咨询对话流式请求:1800秒(30分钟)
- 导航推荐请求:20秒
- 普通JSON请求:10秒
- 连接超时:10秒
- 最小超时保护:防止配置错误导致无限等待
五、历史消息裁剪优化
在历史消息处理模块,我们实现了智能裁剪逻辑:
private List<Map<String, Object>> trimHistory(List<Map<String, Object>> historyMessages) {
if (historyMessages == null || historyMessages.isEmpty()) {
return List.of();
}
// 只保留最近12条消息,避免Token超限
int start = Math.max(0, historyMessages.size() - 12);
return new ArrayList<>(historyMessages.subList(start, historyMessages.size()));
}
设计考虑:
- 避免AI模型Token限制
- 减少网络传输开销
- 保持上下文连贯性
六、PDF报告生成
1. 报告流式下载
在PDF报告模块,我们实现了流式下载功能:
public ReportStreamHandle openReportStream(String sessionId, Long userId) {
String traceId = UUID.randomUUID().toString().replace("-", "");
Map<String, Object> payload = new LinkedHashMap<>();
payload.put("trace_id", traceId);
payload.put("session_id", sessionId);
payload.put("user_id", userId);
payload.put("source", "springboot");
HttpRequest request = HttpRequest.newBuilder(URI.create(fastApiBaseUrl + "/report/file"))
.header("Content-Type", "application/json")
.header("Accept", "application/pdf")
.timeout(Duration.ofSeconds(60))
.POST(HttpRequest.BodyPublishers.ofString(requestBody))
.build();
HttpResponse<InputStream> response = httpClient.send(
request,
HttpResponse.BodyHandlers.ofInputStream()
);
String contentType = response.headers()
.firstValue("Content-Type")
.orElse("application/pdf");
String contentDisposition = response.headers()
.firstValue("Content-Disposition")
.orElse("");
return new ReportStreamHandle(
traceId,
response.body(),
contentType,
contentDisposition
);
}
2. Controller层处理
@GetMapping("/sessions/{sessionId}/report")
public ResponseEntity<?> getSessionReport(
@PathVariable("sessionId") String sessionId,
@RequestHeader(value = "Authorization", required = false) String authorization
) {
// 验证权限
Result<UserInfoEntity> authResult = authenticate(authorization, tokenParam);
Optional<AgentSessionEntity> ownership = loadOwnedSession(sessionId, userId);
// 检查报告是否生成
Map<String, Object> reportResponse = integrationHttpService.fetchConsultReport(
sessionId, userId
);
boolean hasReport = toBoolean(reportData.get("has_report"));
if (!hasReport) {
return ResponseEntity.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(success(Map.of("hasReport", false), "report not generated yet"));
}
// 下载PDF报告
IntegrationHttpService.ReportStreamHandle reportHandle =
integrationHttpService.openReportStream(sessionId, userId);
byte[] reportBytes = reportHandle.stream().readAllBytes();
return ResponseEntity.ok()
.contentType(MediaType.APPLICATION_PDF)
.header(HttpHeaders.CONTENT_DISPOSITION,
"attachment; filename=\"prediagnosis_" + sessionId + ".pdf\"")
.header("X-Has-Report", "true")
.body(reportBytes);
}
七、配置管理
application.properties配置
系统通过配置文件管理服务地址和超时参数:
# 服务地址配置
integration.fastapi.base-url=http://127.0.0.1:8000
integration.springboot.base-url=http://127.0.0.1:8080
# 超时配置
integration.fastapi.consult.stream-timeout-seconds=1800
integration.fastapi.navigation.timeout-seconds=20
# 文件上传配置
app.upload.root-dir=uploads
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=12MB
配置注入
public IntegrationHttpService(
@Value("${integration.fastapi.base-url:http://127.0.0.1:8000}") String fastApiBaseUrl,
@Value("${integration.springboot.base-url:http://127.0.0.1:8080}") String springbootBaseUrl,
@Value("${integration.fastapi.consult.stream-timeout-seconds:1800}") long consultStreamTimeoutSeconds,
@Value("${integration.fastapi.navigation.timeout-seconds:20}") long navigationTimeoutSeconds
) {
this.fastApiBaseUrl = fastApiBaseUrl;
this.springbootBaseUrl = springbootBaseUrl;
this.consultStreamTimeoutSeconds = Math.max(60, consultStreamTimeoutSeconds);
this.navigationTimeoutSeconds = Math.max(3, navigationTimeoutSeconds);
}
八、桥接通信数据结构
BridgePayload 数据结构
BridgePayload是桥接通信的核心数据结构:
public record BridgePayload(
String traceId, // 追踪ID
String source, // 来源服务
String message, // 消息内容
String callbackUrl, // 回调URL
Map<String, Object> metadata // 元数据
) {}
通信流程图
Spring Boot FastAPI
| |
|--- triggerFastApi() ------->|
| POST /integration/ |
| receive-from-springboot |
| |
|<--- 返回结果 ---------------|
| |
|--- 回调(可选) ------------->|
| POST callbackUrl |
| |
九、总结
本项目的微服务集成架构具有以下优势:
1. 职责分离:Spring Boot处理业务逻辑,FastAPI专注AI推理
2. 灵活通信:支持同步、异步、流式三种通信模式
3. 全链路追踪:Trace ID贯穿整个调用链
4. 容错设计:超时控制、异常处理、状态码检查
5. 性能优化:消息裁剪、缓冲区管理、连接池复用
6. PDF报告:支持问诊报告生成与下载
7. 配置灵活:通过配置文件管理超时、地址等参数
通过这套架构,我们实现了业务逻辑与AI能力的完美解耦,系统具有良好的扩展性和可维护性。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)