一、架构概述

"问康"项目采用双微服务架构。本次开发完成了Spring Boot与FastAPI之间的完整桥接通信实现,实现了业务逻辑与AI能力的解耦:

- Spring Boot:主后端服务,处理业务逻辑、用户认证、数据持久化
- FastAPI:AI模型推理服务,提供AI问诊、OCR识别、导航推荐等能力

两个服务通过HTTP协议进行桥接通信,实现了业务逻辑与AI能力的解耦。

二、桥接通信模式设计

项目设计了三种桥接通信模式:

1. 触发-回调模式(Trigger-Callback Pattern)

本次开发在集成模块新增了完整的触发-回调通信机制,支持异步响应处理:

public Map<String, Object> triggerFastApi(BridgeTriggerRequest request) {
    String traceId = UUID.randomUUID().toString().replace("-", "");
    BridgePayload payload = new BridgePayload(
        traceId,
        "springboot",
        request.message(),
        springbootBaseUrl + "/integration/callback-from-fastapi",
        safeMetadata(request.metadata(), "triggered_by", "springboot")
    );
    
    // 发送请求到FastAPI
    Map<String, Object> fastApiResponse = postJson(
        fastApiBaseUrl + "/integration/receive-from-springboot",
        payload
    );
    
    return Map.of(
        "status", "success",
        "initiator", "springboot",
        "trace_id", traceId,
        "fastapi_response", fastApiResponse
    );
}

设计要点:
- Trace ID机制:每个请求生成唯一的traceId,实现全链路追踪
- 回调URL传递:在payload中携带callbackUrl,支持异步响应处理
- 元数据扩展:通过metadata字段传递额外上下文信息

2. 直接响应模式(Direct Response Pattern)

在直接响应模式中,系统支持主动发起回调请求:

public Map<String, Object> receiveFromFastApi(BridgePayload payload) {
    // 如果需要回调,主动发起回调请求
    Map<String, Object> callbackResponse = null;
    if (StringUtils.hasText(payload.callbackUrl())) {
        BridgePayload callbackPayload = new BridgePayload(
            payload.traceId(),
            "springboot",
            "Spring Boot callback acknowledged trace " + payload.traceId(),
            null,
            safeMetadata(payload.metadata(), "callback_from", "springboot")
        );
        callbackResponse = postJson(payload.callbackUrl(), callbackPayload);
    }
    
    return Map.of(
        "status", "received",
        "receiver", "springboot",
        "trace_id", payload.traceId(),
        "callback_response", callbackResponse
    );
}

3. 回调接收模式(Callback Receive Pattern)

回调接收模式用于处理FastAPI的异步回调:

public Map<String, Object> receiveCallbackFromFastApi(BridgePayload payload) {
    return Map.of(
        "status", "callback_received",
        "receiver", "springboot",
        "trace_id", payload.traceId(),
        "message", payload.message(),
        "metadata", safeMetadata(payload.metadata())
    );
}

三、SSE流式通信实现

咨询对话的SSE流式处理

本次开发在SSE流式通信模块新增了完整的流式处理逻辑,通过Java HttpClient原生支持SSE协议:

public ConsultStreamHandle openConsultStream(Long messageId, String sessionId, Long userId) {
    String traceId = UUID.randomUUID().toString().replace("-", "");
    Map<String, Object> payload = buildConsultPayload(
        traceId,
        messageId,
        sessionId,
        userId,
        true,  // 启用流式响应
        "springboot_stream"
    );
    
    HttpRequest request = HttpRequest.newBuilder(URI.create(fastApiBaseUrl + "/consult"))
        .header("Content-Type", "application/json")
        .header("Accept", "text/event-stream")
        .header("Accept-Encoding", "identity")
        .timeout(Duration.ofSeconds(consultStreamTimeoutSeconds))  // 默认30分钟超时
        .POST(HttpRequest.BodyPublishers.ofString(requestBody))
        .build();
    
    HttpResponse<InputStream> response = httpClient.send(
        request, 
        HttpResponse.BodyHandlers.ofInputStream()
    );
    
    return new ConsultStreamHandle(traceId, response.body());
}

Controller层的SSE流式转发

本次开发在Controller层新增了完整的SSE流式转发逻辑:

@PostMapping(value = "/sessions/{sessionId}/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<StreamingResponseBody> chatStream(...) {
    IntegrationHttpService.ConsultStreamHandle streamHandle = 
        integrationHttpService.openConsultStream(...);
    
    StreamingResponseBody streamingBody = outputStream -> {
        // 先发送accepted事件
        writeSseEvent(outputStream, "accepted", acceptedJson);
        
        // 实时转发FastAPI的SSE流
        try (InputStream inputStream = streamHandle.stream()) {
            byte[] buffer = new byte[8192];
            int length;
            while ((length = inputStream.read(buffer)) != -1) {
                outputStream.write(buffer, 0, length);
                outputStream.flush();  // 确保实时推送
            }
        }
    };
    
    return ResponseEntity.ok()
        .contentType(MediaType.TEXT_EVENT_STREAM)
        .header("X-Accel-Buffering", "no")  // 禁用代理缓冲
        .body(streamingBody);
}

关键技术点:
- 使用Java 11的HttpClient原生支持SSE
- 通过StreamingResponseBody实现响应流转发
- 设置X-Accel-Buffering: no防止反向代理缓存
- 8KB缓冲区平衡性能与内存占用

四、超时与容错策略

本次开发在超时控制模块新增了完整的容错机制,通过最小超时保护防止配置错误:

public IntegrationHttpService(...) {
    this.consultStreamTimeoutSeconds = Math.max(60, consultStreamTimeoutSeconds);
    this.navigationTimeoutSeconds = Math.max(3, navigationTimeoutSeconds);
    this.httpClient = HttpClient.newBuilder()
        .connectTimeout(Duration.ofSeconds(10))  // 连接超时10秒
        .version(HttpClient.Version.HTTP_1_1)
        .build();
}

private Map<String, Object> postJsonWithTimeout(String url, Object payload, long timeoutSeconds) {
    HttpRequest request = HttpRequest.newBuilder(URI.create(url))
        .header("Content-Type", "application/json")
        .timeout(Duration.ofSeconds(Math.max(3, timeoutSeconds)))
        .POST(HttpRequest.BodyPublishers.ofString(requestBody))
        .build();
    
    HttpResponse<String> response = httpClient.send(request, ...);
    if (response.statusCode() >= 400) {
        throw new IllegalStateException(
            "Request to " + url + " failed with " + response.statusCode()
        );
    }
}

超时配置:
- 咨询对话流式请求:1800秒(30分钟)
- 导航推荐请求:20秒
- 普通JSON请求:10秒
- 连接超时:10秒
- 最小超时保护:防止配置错误导致无限等待

五、历史消息裁剪优化

在历史消息处理模块,我们实现了智能裁剪逻辑:

private List<Map<String, Object>> trimHistory(List<Map<String, Object>> historyMessages) {
    if (historyMessages == null || historyMessages.isEmpty()) {
        return List.of();
    }
    // 只保留最近12条消息,避免Token超限
    int start = Math.max(0, historyMessages.size() - 12);
    return new ArrayList<>(historyMessages.subList(start, historyMessages.size()));
}

设计考虑:
- 避免AI模型Token限制
- 减少网络传输开销
- 保持上下文连贯性

六、PDF报告生成

1. 报告流式下载

在PDF报告模块,我们实现了流式下载功能:

public ReportStreamHandle openReportStream(String sessionId, Long userId) {
    String traceId = UUID.randomUUID().toString().replace("-", "");
    Map<String, Object> payload = new LinkedHashMap<>();
    payload.put("trace_id", traceId);
    payload.put("session_id", sessionId);
    payload.put("user_id", userId);
    payload.put("source", "springboot");
    
    HttpRequest request = HttpRequest.newBuilder(URI.create(fastApiBaseUrl + "/report/file"))
        .header("Content-Type", "application/json")
        .header("Accept", "application/pdf")
        .timeout(Duration.ofSeconds(60))
        .POST(HttpRequest.BodyPublishers.ofString(requestBody))
        .build();
    
    HttpResponse<InputStream> response = httpClient.send(
        request, 
        HttpResponse.BodyHandlers.ofInputStream()
    );
    
    String contentType = response.headers()
        .firstValue("Content-Type")
        .orElse("application/pdf");
    String contentDisposition = response.headers()
        .firstValue("Content-Disposition")
        .orElse("");
    
    return new ReportStreamHandle(
        traceId, 
        response.body(), 
        contentType, 
        contentDisposition
    );
}

2. Controller层处理

@GetMapping("/sessions/{sessionId}/report")
public ResponseEntity<?> getSessionReport(
    @PathVariable("sessionId") String sessionId,
    @RequestHeader(value = "Authorization", required = false) String authorization
) {
    // 验证权限
    Result<UserInfoEntity> authResult = authenticate(authorization, tokenParam);
    Optional<AgentSessionEntity> ownership = loadOwnedSession(sessionId, userId);
    
    // 检查报告是否生成
    Map<String, Object> reportResponse = integrationHttpService.fetchConsultReport(
        sessionId, userId
    );
    
    boolean hasReport = toBoolean(reportData.get("has_report"));
    if (!hasReport) {
        return ResponseEntity.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(success(Map.of("hasReport", false), "report not generated yet"));
    }
    
    // 下载PDF报告
    IntegrationHttpService.ReportStreamHandle reportHandle = 
        integrationHttpService.openReportStream(sessionId, userId);
    
    byte[] reportBytes = reportHandle.stream().readAllBytes();
    
    return ResponseEntity.ok()
        .contentType(MediaType.APPLICATION_PDF)
        .header(HttpHeaders.CONTENT_DISPOSITION, 
            "attachment; filename=\"prediagnosis_" + sessionId + ".pdf\"")
        .header("X-Has-Report", "true")
        .body(reportBytes);
}

七、配置管理

application.properties配置

系统通过配置文件管理服务地址和超时参数:

# 服务地址配置
integration.fastapi.base-url=http://127.0.0.1:8000
integration.springboot.base-url=http://127.0.0.1:8080

# 超时配置
integration.fastapi.consult.stream-timeout-seconds=1800
integration.fastapi.navigation.timeout-seconds=20

# 文件上传配置
app.upload.root-dir=uploads
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=12MB

配置注入

public IntegrationHttpService(
    @Value("${integration.fastapi.base-url:http://127.0.0.1:8000}") String fastApiBaseUrl,
    @Value("${integration.springboot.base-url:http://127.0.0.1:8080}") String springbootBaseUrl,
    @Value("${integration.fastapi.consult.stream-timeout-seconds:1800}") long consultStreamTimeoutSeconds,
    @Value("${integration.fastapi.navigation.timeout-seconds:20}") long navigationTimeoutSeconds
) {
    this.fastApiBaseUrl = fastApiBaseUrl;
    this.springbootBaseUrl = springbootBaseUrl;
    this.consultStreamTimeoutSeconds = Math.max(60, consultStreamTimeoutSeconds);
    this.navigationTimeoutSeconds = Math.max(3, navigationTimeoutSeconds);
}

八、桥接通信数据结构

BridgePayload 数据结构

BridgePayload是桥接通信的核心数据结构:

public record BridgePayload(
    String traceId,          // 追踪ID
    String source,           // 来源服务
    String message,          // 消息内容
    String callbackUrl,      // 回调URL
    Map<String, Object> metadata  // 元数据
) {}

通信流程图

Spring Boot                    FastAPI
    |                             |
    |--- triggerFastApi() ------->|
    |   POST /integration/        |
    |   receive-from-springboot   |
    |                             |
    |<--- 返回结果 ---------------|
    |                             |
    |--- 回调(可选) ------------->|
    |   POST callbackUrl          |
    |                             |

九、总结

本项目的微服务集成架构具有以下优势:

1. 职责分离:Spring Boot处理业务逻辑,FastAPI专注AI推理
2. 灵活通信:支持同步、异步、流式三种通信模式
3. 全链路追踪:Trace ID贯穿整个调用链
4. 容错设计:超时控制、异常处理、状态码检查
5. 性能优化:消息裁剪、缓冲区管理、连接池复用
6. PDF报告:支持问诊报告生成与下载
7. 配置灵活:通过配置文件管理超时、地址等参数

通过这套架构,我们实现了业务逻辑与AI能力的完美解耦,系统具有良好的扩展性和可维护性。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐