三方消息对接为什么总翻车?一套 RocketMQ + Redis 幂等 的工业级解法(含架构图+伪代码)

🔥 适合收藏:三方平台对接、商品/订单同步、消息中台治理
✅ 你将收获:一套可直接落地的“拉取-发送-消费-重试-幂等-补偿”完整方案


目录


1. 真实痛点:为什么“跑通了”还会出事故?

三方对接系统最常见的线上事故,不是“接口不通”,而是:

  • 下游偶发超时,整条链路阻塞;
  • 同一条消息重复消费,业务重复执行;
  • 部分成功部分失败,导致状态混乱;
  • Token 过期,任务随机失败;
  • 三方消息删除太早,回溯困难。

本质上是缺少一套“可靠消息 + 幂等治理”的系统设计。


2. 一张图看全链路架构

Scheduler/Task 定时任务

ChannelAdapter 拉取三方消息

分页结果 SearchResponse

Producer 分片发送 MQ

RocketMQ Broker

Consumer 并发监听

Transfer 统一入口

Redis 幂等过滤

模型转换 + 业务处理

处理成功?

写入幂等版本时间

RECONSUME_LATER


3. 从发送到消费:代码级实现方案

3.1 发送端:分页拉取 + 双重分片 + 同步发送

设计策略

  • 每页拉取结果立即发 MQ(避免大事务堆积);
  • 先按条数切分,再按字节大小切分(防超大消息);
  • 同步发送,失败立即可感知。

伪代码

void executeTask() {
    Request req = new Request(page=1, pageSize=100);

    while (true) {
        SearchResponse resp = channel.search(req);
        assert resp.success : "channel call failed";

        // 双重分片:数量 + 大小
        List<List<Item>> chunks = splitByCountAndSize(resp.items, 50, 3 * MB);
        for (List<Item> chunk : chunks) {
            mqProducer.send(
                topic = "TOPIC_ITEM",
                tag = "CHANNEL_X",
                body = wrap(chunk, accountInfo)
            );
        }

        if (!resp.hasNext) break;

        req.page++;
        req.extendParam = resp.extendParam; // 透传上下文(例如待删消息ID)
    }
}

3.2 消费端:失败即重试消费

设计策略

  • 并发消费提高吞吐;
  • 单条失败返回 RECONSUME_LATER
  • Broker 重投递同一条消息,不依赖生产端重发。

伪代码

ConsumeStatus consume(List<Message> msgs) {
    for (Message msg : msgs) {
        if (!process(msg)) {
            return RECONSUME_LATER;
        }
    }
    return CONSUME_SUCCESS;
}

4. RocketMQ 在这套方案里的关键作用

4.1 解耦

把“拉取三方”与“下游处理”拆开,避免互相阻塞。

4.2 弹性缓冲

下游短时抖动时,消息先堆在 Broker,系统仍能继续拉取并入队。

4.3 自动重试

消费失败后由 Broker 重投递,业务侧只需正确返回状态。

4.4 至少一次投递语义

这也是为什么必须做消费端幂等,否则重复消费必出问题。


5. Redis 幂等到底怎么做才靠谱?

5.1 推荐键模型(脱敏)

idempotent:{tenant}:{account}:{topic}:{bizKey} -> "{bizKey}:{versionTs}"
TTL = 24h
  • bizKey:业务唯一键(如 SKU、订单号)
  • versionTs:业务变更时间(推荐用三方业务时间)

5.2 判定规则

  • Redis 无记录:处理
  • 有记录且 currentTs <= redisTs:跳过
  • 有记录且 currentTs > redisTs:处理(新版本)

5.3 伪代码

Map<String, Long> current = buildBizKeyVersionMap(items);
List<String> skipKeys = idempotentStore.calcSkip(current);

List<Item> toProcess = items.stream()
    .filter(i -> !skipKeys.contains(i.bizKey()))
    .toList();

if (toProcess.isEmpty()) return true;

boolean ok = doBusiness(toProcess);

if (ok) {
    idempotentStore.saveProcessed(versionMapOf(toProcess), Duration.ofHours(24));
}
return ok;

6. 三方 Token 过期的工程化兜底

线上常见错误码:-100(示例,表示 token 过期)。
建议统一做“刷新 + 重试 1 次”。

伪代码

Response callWithRetry(Supplier<Response> call) {
    Response r1 = call.get();
    if (!isTokenExpired(r1)) return r1;

    if (!tokenService.refresh()) return r1;
    return call.get(); // only one retry
}

建议只重试一次,避免无限循环放大故障。


7. 最容易踩坑的 7 个点

  1. 把字符串 msgId 当 Long 处理
    导致删除 ID 为空,平台消息一直删不掉。

  2. 只在任务开始检查 token
    运行中仍可能过期,必须支持接口级自动刷新重试。

  3. 幂等只按 bizKey,不按版本时间
    会吞掉真实增量更新。

  4. versionTs 用系统 now() 而非业务时间
    在部分场景会弱化幂等语义,建议优先使用三方业务变更时间。

  5. 误以为消费失败会触发“生产端重发”
    实际是 Broker 重投递同一条消息。

  6. 删除三方消息时机过早
    建议延迟删除并与成功发送链路关联。

  7. 没有 DLQ 回放机制
    超过最大重试后没有补偿能力,线上恢复成本高。


8. 可直接复用的伪代码模板

8.1 幂等存储接口

public interface IdempotentStore {
    List<String> calcSkip(Map<String, Long> keyToVersion);
    void saveProcessed(Map<String, Long> keyToVersion, Duration ttl);
}

8.2 消费处理器接口

public interface MessageProcessor<T> {
    String bizKey(T item);
    Long versionTs(T item);  // 优先使用三方业务时间
    boolean handle(List<T> items);
}

8.3 通用处理骨架

boolean process(List<T> items) {
    Map<String, Long> current = items.stream()
        .collect(toMap(this::bizKey, this::versionTs, Math::max));

    List<String> skip = idempotentStore.calcSkip(current);
    List<T> todo = items.stream()
        .filter(i -> !skip.contains(bizKey(i)))
        .toList();

    if (todo.isEmpty()) return true;

    boolean ok = handle(todo);
    if (ok) {
        Map<String, Long> done = todo.stream()
            .collect(toMap(this::bizKey, this::versionTs, Math::max));
        idempotentStore.saveProcessed(done, Duration.ofHours(24));
    }
    return ok;
}

9. FAQ(高频问题)

Q1:三方消息删了,MQ 消费失败还能重试吗?

能。只要消息已入 MQ,重试依赖 Broker,不依赖再去三方拉取。

Q2:为什么有时会看到重复消费?

RocketMQ 是至少一次投递,重复是正常现象。消费端必须幂等。

Q3:幂等 TTL 设多久合适?

看业务重放窗口。常见 24h/48h;若跨天补偿多,建议更长并配合回放策略。

Q4:部分成功部分失败怎么办?

建议“成功项写幂等,失败项走重试”,并确保状态可追踪。


10. 结语

很多系统的问题不在“业务逻辑”,而在“链路治理能力”。
一套可长期稳定的三方同步系统,核心是这 5 个词:

解耦、重试、幂等、补偿、可观测。

如果你正在做三方对接,这套 RocketMQ + Redis 幂等方案,基本可以作为你的默认基线。


Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐