本文从工程实践角度,分享一个基于 Spring Boot 的多阶段 AI 评测流水线系统的架构设计,涵盖流水线编排、设计模式运用、并发控制、多模型集成等核心技术方案。


1. 业务背景与系统定位

在 AI 应用落地中,常遇到这样的场景:对某个目标对象进行多维度、多视角的智能分析,最终产出一份结构化的评测报告。这种需求往往不是一次 LLM 调用就能完成的,而是需要拆解为多个子任务,按序执行,中间包含数据采集、AI 生成、量化评分、报告汇总等环节。

本系统正是为此类场景设计的通用技术框架,核心特征:

  • 多阶段流水线:将评测流程拆解为 4 个阶段,顺序执行
  • 多 AI 模型集成:支持接入多个 LLM 平台,对比评估结果
  • 异步解耦:阶段间通过消息队列异步通信,支持外部 Worker 消费
  • 断点恢复:任务执行到任何阶段均可重跑和恢复
  • 动态可配置:AI 平台、队列、参数均支持运行时动态调整

2. 总体架构

2.1 模块划分

├── citymap-aigeo-parent           # 父模块:依赖版本管理
│   ├── cm-m-aigeo                 # 主服务模块:业务逻辑、API、调度
│   └── cm-m-aigeo-pojo           # POJO 模块:BO/DTO 定义

采用 Maven 多模块 架构,POJO 模块独立发布,供其他微服务通过依赖共享数据模型。

2.2 技术栈

层次 选型 用途
基础框架 Spring Boot 3.3 / Spring Cloud 2023 微服务基础设施
配置中心 Apollo 2.2 动态配置管理
服务发现 Eureka 服务注册与发现
数据库 MySQL + SQL Server (多数据源) 持久化存储
ORM MyBatis 3 + 乐观锁 数据访问层
缓存 Redis + Caffeine 多级缓存加速
消息队列 RabbitMQ 任务异步分发
AI 调用 OpenAI 兼容协议 + 百度千帆 API 大模型接入
模板引擎 FreeMarker 动态 Prompt 渲染
API 文档 SpringDoc OpenAPI (Swagger 3) 接口文档自动生成

2.3 架构分层

┌─────────────────────────────────────────────────────┐
│                  API Controllers                     │
│  (REST 接口层:任务管理、查询、重跑、补偿)              │
├─────────────────────────────────────────────────────┤
│                 Service 接口 + 实现                   │
│  (业务编排、外部 Feign 调用、领域逻辑)                  │
├─────────────────────────────────────────────────────┤
│               Pipeline Orchestrator                  │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐        │
│  │ Coordinator │ → │  Monitor   │ → │ Executor  │    │
│  └──────────┘   └──────────┘   └──────────┘        │
├─────────────────────────────────────────────────────┤
│  Stage 1      Stage 2       Stage 3      Stage 4    │
│ Generate  →  Generate  →  Evaluate  →  Generate     │
│  Data        Content       Score        Report       │
├─────────────────────────────────────────────────────┤
│              DAO Layer (MyBatis)                     │
├─────────────────────────────────────────────────────┤
│  DataSource  │   MQ (RabbitMQ)  │  Feign Clients    │
└─────────────────────────────────────────────────────┘

2.4 一次完整请求的时序

下面的时序图展示了从客户端发起任务到流水线全流程执行的核心交互路径:

基础设施 执行器层 服务编排层 API层 Database TaskMonitor RabbitMQ Phase4: 报告产出 Phase3: 量化评分 Phase2: 内容生成 Phase1: 数据生成 StageExecutorRegistry TaskCoordinator GeoTaskService Controller 客户端 Database TaskMonitor RabbitMQ Phase4: 报告产出 Phase3: 量化评分 Phase2: 内容生成 Phase1: 数据生成 StageExecutorRegistry TaskCoordinator GeoTaskService Controller 客户端 ── Phase 1 执行 ── ── Phase 2 执行 ── alt [queue 模式] [mock 模式] ── Monitor 守护线程轮询 ── ── Phase 3 执行 ── ── Phase 4 执行 ── alt [全部答案已完成] [超时 (60 分钟)] loop [每 5 秒 [Phase 2 进行中]] POST /api/execute executeTask(request) 插入任务记录 (RUNNING) 初始化4个阶段状态 (PENDING) executeTask(context) getExecutor(Phase1) GenerateDataExecutor execute(context) 更新阶段状态 = RUNNING 远程数据采集 + LLM 调用 保存生成的结构化数据 更新阶段状态 = FINISHED return true getExecutor(Phase2) GenerateContentExecutor execute(context) 创建 N平台×M 条答案记录 发送 N 条任务/问题消息 CompletableFuture 并行 LLM 调用 return true (异步) 查询 Phase2 运行中的任务 检测答案完成状态 triggerNextStage(Phase3) getExecutor(Phase3) EvaluateScoreExecutor execute(context) 更新阶段状态 = RUNNING M维度×N平台 同步评分 保存各维度评分数据 综合评估 (另一次LLM调用) 更新任务得分 更新阶段状态 = FINISHED return true getExecutor(Phase4) GenerateReportExecutor execute(context) 更新阶段状态 = RUNNING LLM 深度思考生成报告 保存报告 更新任务状态 = FINISHED return true 更新阶段状态 = FAILED 更新任务状态 = ERROR 返回任务ID + 状态 200 OK { taskId, state }

3. 核心流程:四阶段流水线

3.1 阶段定义

[阶段1: 数据生成]  →  [阶段2: 内容生成]  →  [阶段3: 量化评估]  →  [阶段4: 报告产出]
阶段 职责 执行方式 后置动作
Phase 1 - 数据生成 基于输入信息和外部数据,调用 LLM 生成结构化数据 同步 自动触发 Phase 2
Phase 2 - 内容生成 将数据分发到 N 个 AI 平台并行生成内容 异步(MQ/直调) 由 Monitor 守护线程检测完成
Phase 3 - 量化评估 在 M 个维度上对 N 个平台的结果进行评分 同步 自动触发 Phase 4
Phase 4 - 报告产出 整合评分数据,调用 LLM 生成最终报告 同步 任务结束

3.2 阶段间的流转策略

流水线中阶段间的流转并非一刀切的全自动模式,而是根据阶段特性采用不同策略:

Phase 1 ──► Phase 2 ──► Phase 3 ──► Phase 4
   │            │            │            │
   │  自动触发   │  条件触发   │  自动触发   │  终止
   │  (同步完   │  (Monitor  │  (同步完   │
   │   成就触发) │   轮询检测) │   成就触发) │
   ▼            ▼            ▼            ▼
 继续下一阶段  等待所有平台   继续下一阶段  任务完成
              完成回答

Phase 1 → Phase 2:Phase 1 执行成功后,直接在同一个线程中触发 Phase 2。因为 Phase 1 是纯同步操作,完成后可以立即进入下一阶段。

Phase 2 → Phase 3:这是唯一需要条件触发的阶段转换。Phase 2 涉及多个 AI 平台的异步内容生成,何时全部完成是未知的,因此引入守护线程轮询。

Phase 3 → Phase 4:Phase 3 执行成功后,直接触发 Phase 4。Phase 3 虽内部有大量并行 LLM 调用,但整体是同步等待的。

3.3 流水线协调时序

下面的时序图精确刻画了 Coordinator 如何编排 4 个阶段,以及 Monitor 守护线程如何介入异步阶段的条件检测:

MQ Database TaskMonitor Phase4 报告产出 Phase3 量化评分 Phase2 内容生成 Phase1 数据生成 StageRegistry TaskCoordinator MQ Database TaskMonitor Phase4 报告产出 Phase3 量化评分 Phase2 内容生成 Phase1 数据生成 StageRegistry TaskCoordinator ════ 第一阶段:同步执行 ════ ════ 第二阶段:异步分发 ════ alt [生产模式 (queue)] [开发模式 (mock)] 直接返回,不阻塞 ════ 第三阶段:条件触发由 Monitor 完成 ════ ════ 第三阶段:同步评分 ════ ════ 第四阶段:同步执行 ════ alt [全部完成] [超时 (60min)] loop [每 5s [任务在 Phase2 中]] getExecutor(Phase1) GenerateDataExecutor execute(ctx) 阶段状态 = RUNNING 采集数据 + 调用 LLM 阶段状态 = FINISHED true getExecutor(Phase2) GenerateContentExecutor execute(ctx) 创建 N×M 条答案 (PENDING) 发送消息到各平台队列 并行调用 LLM 模拟 true 查询 Phase2 活跃任务 检测答案完成条件 triggerNextStage(Phase3) getExecutor(Phase3) EvaluateScoreExecutor execute(ctx) 阶段状态 = RUNNING M维度 × N平台 LLM评分 各维度分数持久化 综合评估 (额外 LLM) 任务总分 + 等级更新 阶段状态 = FINISHED true getExecutor(Phase4) GenerateReportExecutor execute(ctx) 阶段状态 = RUNNING LLM 深度思考 保存报告 任务状态 = FINISHED true 阶段状态 = FAILED 任务状态 = ERROR

4. 设计模式应用

本系统大量运用经典设计模式,实现高内聚低耦合的架构。

4.1 模板方法模式(Template Method)

核心抽象类AbstractStageExecutor

public abstract class AbstractStageExecutor implements StageExecutor {

    // 模板方法:定义阶段执行的固定骨架
    @Override
    public boolean execute(TaskContext context) {
        // 1. 前置检查(任务状态校验)
        // 2. 状态初始化
        beforeExecute(context);

        // 3. 子类实现的具体业务逻辑
        boolean result = doExecute(context);

        // 4. 后置处理(状态更新、异常处理)
        afterExecute(context, result);

        return result;
    }

    // 子类必须实现:具体业务
    protected abstract boolean doExecute(TaskContext context);

    // 子类可覆盖:是否为异步阶段
    protected boolean isAsync() { return false; }
}

模板方法 定义了阶段执行的固定骨架:前置检查 → 状态初始化 → 子类业务逻辑 → 后置处理。子类只需实现 doExecute() 即可,无需关心状态管理、异常处理等横切关注点。

4.2 策略模式(Strategy)

接口定义StageExecutor

public interface StageExecutor {
    TaskStageEnum getStage();       // 标识所属阶段
    boolean execute(TaskContext);   // 执行阶段任务
    TaskStageEnum getNextStage();   // 返回下一阶段
}

每个阶段都是一个独立的策略实现,通过 getStage() 标识自己归属的阶段。运行时根据阶段枚举查找对应的执行器。

4.3 注册表模式(Registry)

核心类StageExecutorRegistry

@Component
public class StageExecutorRegistry {
    private final Map<TaskStageEnum, StageExecutor> executorMap = new EnumMap<>(TaskStageEnum.class);

    @Autowired
    private List<StageExecutor> executors;

    @PostConstruct
    public void init() {
        // Spring 自动注入所有 StageExecutor 实现类
        for (StageExecutor executor : executors) {
            executorMap.put(executor.getStage(), executor);
        }
    }

    public StageExecutor getExecutor(TaskStageEnum stage) {
        StageExecutor executor = executorMap.get(stage);
        if (executor == null) {
            throw new IllegalStateException("未找到阶段执行器: " + stage);
        }
        return executor;
    }
}

利用 Spring 的自动装配机制,所有实现了 StageExecutor 接口的 Bean 会自动注入到 List 中。@PostConstruct 阶段按阶段枚举建立映射。新增一个阶段只需实现接口并声明为 Bean,无需修改任何注册逻辑——对扩展开放,对修改封闭。

4.4 观察者模式(Observer)— 守护线程监控

核心类TaskMonitor

@Component
@EnableScheduling
public class TaskMonitor {
    // 单线程守护执行器
    private final ExecutorService executorService =
        Executors.newSingleThreadExecutor(r -> {
            Thread t = new Thread(r, "geo-task-monitor-");
            t.setDaemon(true);
            return t;
        });

    // 每 5 秒轮询一次
    @Scheduled(fixedDelay = 5000)
    public void monitorTasks() {
        // 查询处于 Phase 2 状态的所有任务
        List<AiGeoTask> tasks = dao.selectRunningTasksByStage("generate_answer");

        for (AiGeoTask task : tasks) {
            checkTaskStatus(task);  // 检查完成条件
        }
    }
}

Monitor 作为观察者,定时扫描数据库中的"运行中"任务,检查是否满足继续流转的条件。这是一种基于数据库轮询的观察者模式实现,相比内存中的事件通知,天然支持应用重启后的状态恢复。

Monitor 轮询检测 → 条件满足 → 触发阶段转换的完整时序:

Phase3 EvaluateScoreExecutor TaskCoordinator AllAnswers CompletedCondition Database TaskMonitor @Scheduled 定时器 Phase3 EvaluateScoreExecutor TaskCoordinator AllAnswers CompletedCondition Database TaskMonitor @Scheduled 定时器 每 5 秒触发 检查条件: 1. 无 PENDING 答案 2. 各平台 >= 16 条完成答案 Phase 3 执行... 跳过,等待下次轮询 alt [条件满足 (全部完成)] [条件未满足] alt [已超时] [未超时] loop [每个活跃任务] 等待 5 秒后下一次触发 monitorTasks() selectRunningTasksByStage(Phase2) [task1, task2, ...] selectByTaskIdAndStage(taskId, Phase2) AiGeoTaskStage { status, startTime } isTaskTimeout() → 60min? 阶段状态 = FAILED 任务状态 = ERROR isSatisfied(ctx) answerCheckService .areAllCompleted(taskId) true / false 乐观锁: 阶段状态 = FINISHED triggerNextStage(Phase3) execute(ctx) return return

4.5 条件模式(Condition)

接口定义StageTransitionCondition

public interface StageTransitionCondition {
    TaskStageEnum getCurrentStage();  // 当前阶段
    TaskStageEnum getTargetStage();   // 目标阶段
    boolean isSatisfied(TaskContext); // 条件是否满足
    String getDescription();          // 条件描述
}

将"阶段是否可以转换"这个判断逻辑抽象为独立的条件对象。后续如果增加新的转换条件(如超时强制流转、人工审批通过流转),只需实现此接口即可。

4.6 上下文模式(Context)

核心类TaskContext

@Data
public class TaskContext {
    private String taskId;
    private TaskStageEnum currentStage;
    private StageStatusEnum currentStatus;
    private Map<String, Object> data;    // 跨阶段数据传递
    private String errorMessage;
}

TaskContext 贯穿整个流水线,是各阶段间数据传递的载体。使用 ConcurrentHashMap 保证线程安全,Phase 1 生成的问题 ID 列表、Phase 2 生成的原始内容、Phase 3 计算出的评分等,全部通过 Context 传递。

4.7 组合模式应用汇总

模式                用途                             核心类/接口
──────             ────                             ──────────
模板方法            阶段执行通用骨架                   AbstractStageExecutor
策略               各阶段独立业务逻辑                  StageExecutor
注册表             执行器查找与路由                   StageExecutorRegistry
观察者             异步阶段完成检测                   TaskMonitor
条件               阶段转换条件封装                   StageTransitionCondition
上下文             阶段间数据传递                     TaskContext

5. 并发模型

5.1 线程池配置

@Bean("getAsyncExecutor")
public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(20);          // 核心线程
    executor.setMaxPoolSize(1000);         // 最大线程
    executor.setQueueCapacity(10);         // 阻塞队列
    executor.setThreadNamePrefix("MyAsync-");
    executor.setRejectedExecutionHandler(new CallerRunsPolicy());
    executor.initialize();
    return executor;
}
  • 核心 20 / 最大 1000:应对突发的批量任务提交
  • 队列仅 10:避免任务堆积在内存中,宁可触发拒绝策略走 CallerRuns(慢速反压)
  • CallerRunsPolicy:提交线程自己执行,天然形成反压机制

5.2 各阶段并发模型对比

阶段 内部并发 线程模型
Phase 1 单线程顺序执行 主调度线程
Phase 2 N 平台 × M 问题 异步并行 CompletableFuture 在线程池
Phase 3 M 维度 × N 平台 同步并行 for 循环内的同步调用
Phase 4 单线程顺序执行 主调度线程
Monitor 单线程守护 SingleThreadExecutor + Wait

Phase 2 在 mock 模式下,通过 CompletableFuture 实现多平台异步并行生成答案。实际代码中,simulateThirdPartyAnswer() 为每个 AI 平台创建一个异步任务,每个任务内部循环拉取待处理问题并调用 LLM:

LLM 平台B LLM 平台A Database 线程池 (20~1000) GenerateContent Executor LLM 平台B LLM 平台A Database 线程池 (20~1000) GenerateContent Executor alt [无待处理问题] [有待处理问题] loop [循环拉取 (fetch)] alt [无待处理问题] [有待处理问题] loop [循环拉取 (fetch)] par [平台A 异步] [平台B 异步] CompletableFuture.allOf() 等待所有平台完成 simulateThirdPartyAnswer() processPlatform(A) fetchNextPendingQuestion(A) { questionId, question } 退出循环 callWithLog(question, model-A) 答案内容 save(questionId, answer, FINISHED) 完成 processPlatform(B) fetchNextPendingQuestion(B) { questionId, question } 退出循环 callWithLog(question, model-A) 答案内容 save(questionId, answer, FINISHED) 完成 全部并行任务已结束

注意:在 queue 模式下,simulateThirdPartyAnswer 不会执行,取而代之的是 sendQuestionsToMq — 将消息投递到 RabbitMQ 后立即返回,答案生成由外部 Worker 异步完成。Monitor 守护线程负责轮询检测完成状态并根据条件触发下一阶段。

5.3 乐观锁控制

在阶段状态更新这种读多写少、冲突概率低的场景下,选用乐观锁而非悲观锁:

-- mapper XML 示例
UPDATE ai_geo_task_stage
SET stage_status = #{newStatus},
    version = version + 1
WHERE task_id = #{taskId}
  AND stage_name = #{stageName}
  AND version = #{oldVersion}   -- 乐观锁条件

Java 层的重试机制:

public class OptimisticLockUtil {
    private static final int MAX_RETRIES = 3;
    private static final long RETRY_INTERVAL_MS = 500;

    public static void updateStageStatusWithRetry(...) {
        for (int i = 0; i < MAX_RETRIES; i++) {
            // 1. 查询当前版本
            // 2. 执行 update(条件中包含 version = oldVersion)
            // 3. 如果影响行数 > 0,成功返回
            // 4. 否则 sleep 500ms 后重试
        }
        throw new RuntimeException("乐观锁更新失败,已达最大重试次数");
    }
}

6. 多 AI 平台集成设计

6.1 平台抽象

通过配置驱动的平台定义,支持运行时动态增减:

ai.geo.platforms.enabled: 1,2                    # 启用哪些平台
ai.geo.platforms.names[1]: "平台A"
ai.geo.platforms.names[2]: "平台B"
ai.geo.platforms.providers[1]: "provider-a"
ai.geo.platforms.providers[2]: "provider-b"
ai.geo.platforms.queues[1]: "queue.a.task"
ai.geo.platforms.queues[2]: "queue.b.task"

6.2 每个阶段的模型策略差异化

阶段 模型 温度 端点 深度思考
Phase 1 model-A 0.1 千帆
Phase 2 model-A 0.1 千帆
Phase 3 单维 model-A 0.1 千帆
Phase 3 综合 model-B 0.7 千帆
Phase 4 model-A 0.7 千帆 启用

每个阶段可以根据任务特性选择最合适的模型和参数,通过配置文件动态调整。

6.3 LLM 调用的可观测性

所有 LLM 调用都经过 callWithLog 方法,自动记录以下信息到数据库:

请求内容(Prompt)    → 便于问题复盘
响应内容(Response)  → 便于结果分析
模型名称              → 统计各模型使用量
Temperature / TopP   → 参数追踪
Token 消耗            → 成本核算
响应耗时              → 性能监控
HTTP 状态码           → 异常排查

7. 消息队列分发模式

7.1 双模式设计

ai.geo.answer.mode: queue    # queue | mock(直调模式)
ai.geo.mq.send.mode: task    # task | question(消息粒度)

ai.geo.answer.mode:决定 Phase 2 的内容生成方式

  • mock:服务端直接调用 LLM 生成,适合测试环境
  • queue:发送 MQ 消息,由外部 Worker 消费后回调,适合生产环境

ai.geo.mq.send.mode:控制 MQ 消息的粒度

  • task:每个平台只发一条任务消息,Worker 自主拉取全部问题
  • question:每个平台的每个问题都发一条独立消息,粒度更细

7.2 动态队列注册

利用 Spring 的 DefaultListableBeanFactory 在运行时动态注册 Queue Bean:

@PostConstruct
public void initDynamicQueues() {
    List<Integer> platforms = platformConfig.getEnabledPlatforms();
    for (Integer platform : platforms) {
        String queueName = platformConfig.getQueueByPlatform(platform);
        // 等价于 @Bean 声明,但运行时决定注册哪些队列
        BeanDefinitionBuilder builder = BeanDefinitionBuilder
            .rootBeanDefinition(Queue.class);
        builder.addConstructorArgValue(queueName);   // 队列名
        builder.addConstructorArgValue(true);        // 持久化
        builder.addConstructorArgValue(false);       // 非排他
        builder.addConstructorArgValue(false);       // 非自动删除
        beanFactory.registerBeanDefinition(beanName, builder.getBeanDefinition());
    }
}

这样新增一个 AI 平台只需修改配置,无需修改代码。

7.3 外部 Worker 的回调流程

在 queue 模式下,应用将消息投递到 RabbitMQ,外部 Worker 消费消息、调用 LLM、回调 API 保存结果:

TaskMonitor Database Callback API POST /geo/save 大模型 API External Worker (Python/Go/Java) RabbitMQ Broker Application (Phase2) TaskMonitor Database Callback API POST /geo/save 大模型 API External Worker (Python/Go/Java) RabbitMQ Broker Application (Phase2) 持久化存储消息 alt [无待处理问题] [有待处理问题] loop [循环拉取问题] alt [任务级消息 (task mode)] [问题级消息 (question mode)] ── 服务端轮询检测 ── alt [全部 FINISHED] loop [每 5 秒] 投递任务/问题消息 消费消息 (BasicConsume) fetch(taskId, provider) 查询下一个待处理问题 questionId + questionContent { questionId, question } 消费完成,退出循环 调用 LLM 生成答案 答案内容 save(taskId, questionId, answer, status) 更新答案状态 = FINISHED 200 OK 直接调用 LLM 答案内容 save(taskId, questionId, answer, status) 更新答案状态 = FINISHED 200 OK Ack 确认 检测是否全部答案完成? 触发 Phase3 评分阶段

8. 断点恢复与任务重跑

8.1 应用重启自动恢复

应用重启时,自动检测所有处于 SUSPENDING(挂起)状态的任务,根据当前阶段执行恢复逻辑:

// 启动时扫描挂起的任务
List<AiGeoTask> suspendedTasks = dao.selectTasksByState("SUSPENDING");

for (AiGeoTask task : suspendedTasks) {
    // 1. 设置状态为 RUNNING
    dao.updateState(taskId, "RUNNING");
    // 2. 根据当前阶段执行恢复策略
    switch (task.getCurrentStage()) {
        case Phase1: // 清空已生成数据,重新执行
        case Phase2: // 重置未完成项为 PENDING,继续执行
        case Phase3: // 清空评分数据,重新评分
        case Phase4: // 清空报告,重新生成
    }
}

8.2 各阶段的恢复策略

阶段 恢复策略
Phase 1 - RUNNING 检查是否已有产出 → 有则跳到 Phase 2,无则重新执行
Phase 2 - RUNNING 重置所有非 FINISHED 的项目为 PENDING,重新分发
Phase 3 - RUNNING 清空评分数据,重新评分
Phase 4 - RUNNING 清空报告,重新生成

8.3 局部重跑

对于已完成的失败任务,支持从任意阶段重新执行:

重新从 Phase 2 开始执行 →
  Phase 1: 已 FINISHED    → 保留
  Phase 2: 清空下游,重置  → 重新执行
  Phase 3: 清空            → 等待 Phase 2 完成后执行
  Phase 4: 清空            → 等待 Phase 3 完成后执行

重跑时智能清理下游数据

目标阶段            清理的数据
────────           ──────────
Phase 1         问题 + 内容 + 评分 + 报告
Phase 2         未完成内容重置 + 评分 + 报告
Phase 3         评分 + 报告
Phase 4         报告

8.4 断点恢复完整时序

下面的时序图展示了应用重启后,从数据库检测挂起任务 → 按阶段恢复 → 继续执行的全过程:

Phase4 Phase3 Phase2 Phase1 TaskCoordinator Database GeoTaskService Application 启动 Phase4 Phase3 Phase2 Phase1 TaskCoordinator Database GeoTaskService Application 启动 alt [已有数据] [无数据] 重置所有非 FINISHED 答案为 PENDING Monitor 接管后续检测 清空已有评分数据 清空已有报告 alt [Phase 1 - RUNNING] [Phase 2 - RUNNING] [Phase 3 - RUNNING] [Phase 4 - RUNNING] loop [每个挂起的任务] ── 正常业务流程继续 ── 按各阶段恢复策略继续执行... @PostConstruct recoverTask() selectTasksByState(SUSPENDING) [taskA, taskB, ...] updateState(RUNNING) selectByTaskIdAndStage(taskId, currentStage) { stageName, stageStatus } countByTaskId(question) triggerNextStage(Phase2) triggerNextStage(Phase1) 重新执行 resetNonFinishedAnswers() triggerNextStage(Phase2) 重新分发消息 deleteByTaskId(score) triggerNextStage(Phase3) 重新评分 deleteByTaskId(report) triggerNextStage(Phase4) 重新生成报告 恢复完成 execute(ctx)

9. LLM 响应的鲁棒性处理

9.1 三层评分解析降级

对于 LLM 返回的不稳定格式,采用三级解析策略:

BigDecimal parseDimensionScore(String response) {
    // 第一层:JSON 解析(标准格式)
    // LLM 返回:{"final_score": 4.5}
    JsonNode root = JacksonUtils.jsonToNode(jsonStr);
    if (root != null) {
        JsonNode scoreNode = findNode(root, "final_score");
        if (scoreNode != null && scoreNode.isNumber()) {
            return new BigDecimal(scoreNode.asText());
        }
    }

    // 第二层:正则提取(非标准 JSON 但包含关键字段)
    Pattern p = Pattern.compile("\"final_score\"\\s*:\\s*(\\d+\\.?\\d*)");
    Matcher m = p.matcher(jsonStr);
    if (m.find()) {
        return new BigDecimal(m.group(1));
    }

    // 第三层:纯数字提取(最宽松的兜底策略)
    Pattern simple = Pattern.compile("\\d+\\.?\\d*");
    Matcher sm = simple.matcher(response.replaceAll("\\s+", ""));
    if (sm.find()) {
        return new BigDecimal(sm.group());
    }

    // 全部失败,返回 0
    return BigDecimal.ZERO;
}

9.2 报告生成启用深度思考

Phase 4 的报告生成阶段,启用模型的深度思考能力,以获得更高质量的分析报告:

// 深度思考参数
// {"type": "disabled"} - 禁用
// {"type": "enabled"}  - 启用
if (enableThinking) {
    builder.thinking(Thinking.enabled());
} else {
    builder.thinking(Thinking.disabled());
}

10. 量化评分模型

10.1 多维评分体系

系统设计了 5 个独立的评分维度,每个维度由独立的 LLM 调用得出分数:

维度 评分内容 关联对象
维度 A 对结果全面性的评估 LLM 生成的全部内容
维度 B 对结果准确性的评估 原始数据 vs 生成内容对比
维度 C 对结果丰富程度的评估 内容覆盖范围
维度 D 对结果倾向性的评估 内容的情感色彩
维度 E 对结果优先级的评估 内容的价值排序

10.2 评分计算流程

评分阶段是整个流水线中计算密度最高的环节:M 个维度 × N 个平台 = N × M 次 LLM 调用,外加一次独立的综合评估调用。

LLM 综合评估 (model-B, temp=0.7) LLM 单维评分 (model-A, temp=0.1) Database EvaluateScore Executor LLM 综合评估 (model-B, temp=0.7) LLM 单维评分 (model-A, temp=0.1) Database EvaluateScore Executor ── 维度 × 平台 双重循环 ── loop [平台 1..N] 维度A_分数 = AVG(平台1~N) loop [维度 A] loop [平台 1..N] 维度B_分数 = AVG(平台1~N) loop [维度 B] 同上模式... loop [维度 C ~ E] ── 五维度均分计算 ── 总分 = AVG(维度A, B, C, D, E) 独立 LLM 调用分析完整评分数据 查询任务的所有问题和答案 questions[20] + answers[N×20] 构建 Prompt → 调用评分 {"final_score": 4.5} saveScore(维度A, 平台i, 4.5) 构建 Prompt → 调用评分 {"final_score": 3.8} saveScore(维度B, 平台i, 3.8) 综合评估 (5维度原始数据) { overallScore, overallRating, keyIssue } 更新任务: 总分 + 评级 + 关键问题 阶段状态 = FINISHED

11. 数据库设计要点

11.1 核心表关系

ai_geo_task               # 任务主表
    ├── ai_geo_task_stage       # 阶段状态表(1:N)
    ├── ai_geo_task_question    # 问题表(1:N)
    │   └── ai_geo_answer       # 答案表(1:N × 平台数)
    ├── ai_geo_evaluation_score # 评分表(1:N × 维度 × 平台)
    ├── ai_geo_report           # 报告表(1:1)
    └── ai_geo_task_log         # LLM 调用日志表(1:N)

11.2 乐观锁版本号

任务阶段表通过 version 字段实现乐观并发控制:

CREATE TABLE ai_geo_task_stage (
    task_id VARCHAR(32) NOT NULL,
    stage_name VARCHAR(64) NOT NULL,
    stage_status VARCHAR(16),
    version INT DEFAULT 0,        -- 乐观锁版本号
    start_time DATETIME,
    end_time DATETIME,
    PRIMARY KEY (task_id, stage_name)
);

11.3 多租户设计

通过 system 字段实现数据隔离,所有表都携带此标识:

-- 几乎所有表都有 system 字段
system VARCHAR(32) DEFAULT 'default_system'

CRUD 操作强制要求传入 system 参数,天然支持多租户场景。


12. 配置体系

12.1 三环境配置

环境 Profile 用途
本地开发 local_dev 直连 LLM,无 MQ 依赖
测试 server_test MQ + 完整平台集成
生产 server_prod Apollo 接管,生产级配置

12.2 配置分层

# 基础配置(application.properties):开发友好
chatbot.timeout=120000
ai.geo.answer.mode=queue

# 环境重写(distribute/):测试/生产差异化覆盖
# server_test 通过 Maven AntRun 插件在 compile 阶段覆盖
# server_prod 通过 Apollo 配置中心动态管理

12.3 数据库配置的 JSON 方案

多数据源配置存储在独立的 JSON 配置文件中:

{
  "dataSourceCitymap": {
    "url": "jdbc:mysql://...",
    "driverClassName": "com.mysql.cj.jdbc.Driver",
    "username": "...",
    "password": "..."
  },
  "dataSourceOther": {
    ...
  }
}

通过 DbcfgFactoryBean 加载为 Java 对象,动态创建 HikariDataSource


13. 部署架构

13.1 CI/CD 流水线

Git Push → Jenkins Pipeline → Maven Build → Docker Build → Push Registry → Deploy
  • JDK 17 编译
  • 共享库流水线jenkins-shared-libraries@plugin/maven-docker
  • 双环境部署:Test 和 Prod 使用不同的 Docker 镜像 Tag

13.2 监控集成

  • Pinpoint APM:分布式链路追踪,监控 LLM 调用耗时
  • Log4j2 → Kafka:日志通过 Kafka 汇聚到 ELK
  • Health Endpoint/manage/health 用于 K8s 存活探针

14. 总结与设计理念回顾

设计原则落地

原则 体现
单一职责 每个 Executor 只负责一个阶段
开闭原则 新增阶段或平台只需新增实现,无需修改现有代码
依赖倒置 高层调度依赖 StageExecutor 接口,而非具体实现
组合优于继承 Registry + 策略注入替代了复杂的继承体系

核心设计权衡

  1. 数据库轮询 vs 内存事件:选择基于 DB 的轮询(Monitor),虽然牺牲了实时性(5s 延迟),但换来了重启后状态不丢失的可靠性。

  2. 乐观锁 vs 悲观锁:阶段状态变更冲突概率低,乐观锁避免了数据库连接长时间占用,version 字段还能提供数据一致性校验。

  3. MQ 分发 vs 直调:MQ 模式引入了消息丢失、重复消费等复杂度,但实现了外部 Worker 的解耦,使得 LLM 调用可以独立扩缩容。

  4. 同步阶段 vs 异步阶段:不是所有阶段都需要异步——耗时短且稳定的阶段同步执行(Phase 1, 3, 4),只有等待外部 Worker 的阶段才异步(Phase 2),用最小的复杂度换来最大的吞吐。


本文档基于生产级 Spring Boot 微服务项目的真实架构提炼而成,剥离了特定业务领域细节,保留通用的技术设计思路。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐