1.为什么需要使用消息队列?

  • 化同步为异步;提升用户体验,比如创建虚拟机场景,如果创建虚拟机的请求要等到虚拟机创建完成才返回,那分钟级的时延会用户体验很差,更好的解决方案是立刻返回:创建指令下发成功的提示,通过轮询或者回调来通知用户最终创建结果
  • 解耦;从时间层面来讲,可以避免某个低效的操作成为整个系统的瓶颈——低性能的操作可以通过单独抽取成一个微服务+加机器来处理;从空间层面来讲,如果业务比较复杂,可以将业务流程拆成多个微服务,微服务间通过事件而不是API进行通讯,这样各个微服务的横向拓展就会容易很多,因为其只需要更新MQ中的微服务配置,而不是对所有相关微服务都修改地址配置,配置修改代价从 O(N) -> O(1)
  • 削峰填谷,避免系统被瞬时大流量请求打挂,给系统一个可以按需处理“缓冲区”;比如抢票场景

2.如何提高 MQ 可靠性

2.1 消息生产阶段

关键节点:意图落盘 -> publisher扫描意图表 -> 消息推送

意图落盘

为什么要引入"意图"这个概念,而不是直接推送消息?
如果是个人小项目或者对消息可靠性要求没那么高的场景,其实直接推送消息就行了;
但如果业务对消息可靠性要求很高,不能接受业务意义上的消息遗漏呢,比如

一个订单系统通过消息队列通知下游积分系统“增加积分”,要求尽最大可能不丢,并达到业务可接受的一致性保证

没仔细研究过的我:这题我会!为了防止各种神秘小故障导致 在订单状态被修改成已下单后,消息没推送成功前进程就挂掉了,所以应该使用事务来保证数据库中的订单状态和实际上的积分情况一致。应该使用事务在推送之前修改一下订单表为已下单,再推送加积分消息,比如:

def create_order_with_message():
    try:
        # 1. 开启事务
        db_conn.begin()
        
        # 2. 插入订单记录,状态为"下单完成"
        order_sql = """
        INSERT INTO orders (user_id, amount, status, created_at) 
        VALUES (%s, %s, %s, NOW())
        """
        cursor.execute(order_sql, (1001, 199.99, '下单完成'))
        order_id = cursor.lastrowid
        
        # 3. 推送积分消息到MQ
        message = f'{{"order_id": {order_id}, "points": 200}}'
        channel.basic_publish(
            exchange='',
            routing_key='order_points',
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化消息
            )
        )
        logger.info(f"订单 {order_id} 积分消息已推送")
        
        # 4. 提交事务
        db_conn.commit()
        logger.info(f"订单 {order_id} 创建成功,事务已提交")
        
    except Exception as e:
        # 发生异常,回滚事务
        db_conn.rollback()
        logger.error(f"订单创建失败,事务已回滚: {e}")
        raise

细究的话可以直接把这段代码发给AI,问:

忽略代码中的语法错误,以伪代码的视角来看,这段有什么问题

通过事务来保证 “订单状态改变为已下单”和"发送积分"这两件事的一致性的思路本身是没错的,但两个最核心的问题在于:

问题1.MQ没有回滚能力,db的事务管理不了MQ的发送,也就是说如果捕获到了异常,仅有DB相关的操作可以正常回滚

问题2.推送消息到MQ作为一个相对重载的操作,会大幅延长事务的持续时间,这种行为有个贴切的俗语——“占着茅坑不拉屎”

此时,“意图”这个概念就呼之欲出了——反正由于消息队列的存在,积分增加和用户下单两个操作必然会存在短暂的不一致性(即存在时间差),我们索性在下单完成时只在数据库中把“我要加积分”这个操作的“意图"存下来。

后续的发消息操作可以单独用一个定时任务来做,专门负责发送消息的这个对象通常叫做publisher

publisher扫描意图表

常见的仅有单个publisher的意图表(通常又叫outbox表)设计:

CREATE TABLE outbox_messages (
    -- Outbox 消息自己的主键,用来唯一标识一条待发布消息
    id INTEGER PRIMARY KEY AUTOINCREMENT,

    -- 这条消息属于哪一类业务对象
    -- 例如:sms_message、order、payment
    aggregate_type TEXT NOT NULL,

    -- 这条消息对应的具体业务对象 ID
    -- 例如:短信任务 ID、订单 ID
    aggregate_id TEXT NOT NULL,

    -- 事件类型,表示这条消息描述了什么业务事件
    -- 例如:SMS_TASK_CREATED、ORDER_PAID、STOCK_DECREASED
    event_type TEXT NOT NULL,

    -- 要发送到消息队列的消息内容,通常存 JSON 字符串
    -- 例如:{"sms_id": 1, "phone": "13800000000", "content": "购买成功"}
    payload TEXT NOT NULL,

    -- 当前发布状态
    -- PENDING:等待发布
    -- PUBLISHED:已经发布成功
    -- FAILED:发布失败,但可能还能重试
    -- DEAD:可选,到达最大重试次数,发布失败
    status TEXT NOT NULL DEFAULT 'PENDING',

    -- 已经重试发布的次数
    -- 每次发布失败后 +1
    retry_count INTEGER NOT NULL DEFAULT 0,

    -- 最大重试次数
    -- retry_count 达到这个值后,后台扫描程序可以不再自动发布它
    max_retries INTEGER NOT NULL DEFAULT 3,

    -- 最近一次发布失败的错误信息
    -- 用来排查 RabbitMQ 连接失败、网络超时等问题
    last_error TEXT NULL,

    -- 这条 outbox 消息创建的时间
    -- 通常和业务数据在同一个事务里一起写入
    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,

    -- 成功发布到消息队列的时间
    -- 还没发布成功时为 NULL
    published_at DATETIME NULL
);

消息推送

常见的publisher推送一条消息到队列中的流程大概是这样:

  • 查询outbox表,找出id为outbox_id的记录
  • 校验状态,如果已经发送完成或者到达最大重试次数,就直接 return
  • 数据格式转化,将数据库中的json格式的消息转化为消息队列格式的消息
  • 将消息推送到消息队列
  • 若成功,修改outbox表id为outbox_id的记录状态为已经推送
  • 若失败,修改outbox表的记录的重试次数,如果重试次数已经到达上限,则将状态修改为发送失败

假设消息队列为rabbitmq,参考代码

def publish_outbox_message(
    connection: Any,
    outbox_id: int,
    publisher: SmsTaskPublisher,
    *,
    max_retries: int = DEFAULT_OUTBOX_MAX_RETRIES,
) -> SmsTask:
    outbox = execute(
        connection,
        f"""
        SELECT id, payload, status, retry_count
        FROM outbox_messages
        WHERE id = {placeholder(connection)}
        """,
        (outbox_id,),
    ).fetchone()
    if outbox is None:
        raise SmsQueueError(f"Outbox 消息不存在:{outbox_id}")

    task = sms_task_from_outbox_payload(outbox["payload"])
    if outbox["status"] == OUTBOX_STATUS_PUBLISHED:
        return task

    if outbox["retry_count"] >= max_retries:
        with transaction(connection):
            execute(
                connection,
                f"""
                UPDATE outbox_messages
                SET status = {placeholder(connection)}
                WHERE id = {placeholder(connection)}
                """,
                (OUTBOX_STATUS_FAILED, outbox_id),
            )
        return task

    try:
        publisher(task)
    except Exception as error:
        new_retry_count = outbox["retry_count"] + 1
        with transaction(connection):
            execute(
                connection,
                f"""
                UPDATE outbox_messages
                SET status = {placeholder(connection)},
                    retry_count = {placeholder(connection)},
                    last_error = {placeholder(connection)}
                WHERE id = {placeholder(connection)}
                """,
                (OUTBOX_STATUS_FAILED, new_retry_count, str(error), outbox_id),
            )
        logger.exception("Outbox 消息发布失败:outbox_id=%s", outbox_id)
        raise SmsQueueError(f"Outbox 消息发布失败:{error}") from error

    with transaction(connection):
        execute(
            connection,
            f"""
            UPDATE outbox_messages
            SET status = {placeholder(connection)},
                published_at = CURRENT_TIMESTAMP,
                last_error = NULL
            WHERE id = {placeholder(connection)}
            """,
            (OUTBOX_STATUS_PUBLISHED, outbox_id),
        )

    return task

上面的代码其实还有几个潜在问题:
1.最重要的一个,默认publisher(task) 未报错即代表消息已经推送到了消息队列中,但是实际上神秘小故障有可能导致实际上消息队列并没有成功收到(具体故障可以问AI),因此我们需要使用消息队列自带的Publisher Confirm机制,来确保消息队列确实收到了消息,再修改DB状态。

2.如果发送消息成功,但是由于神秘小故障,导致进程在修改outbox表的状态前就挂掉了,会导致下次publisher再次扫描该表,导致同一个消息被多次生产,这个问题作者目前没找到比较轻量的解决方案,所以我们只能退而求其次,在生产端尽最大可能保证消息不漏、并尽可能减少重复(但是实际上很难做到),对于重复消息,在消费端构造幂等性

3.如果有多个publisher同时执行消息生产工作,那么有可能会出现同一个消息被并发投递两次的情况,虽然消费端构造幂等性能在一定程度上缓解这个问题,但是实践中我们可以通过给outbox表添加 locked_at 和 locked_by 两个字段,将原本的 查询outbox表中id为xx 这个操作纯查询操作 改为 将outbox表中id为xx 的 状态改为 publishing(发送中) ,同时将 locked_at 和 locked_by 字段分别改为 锁超时时间 和 含有UUID(防止锁被误删)的字符串,这样就实现了一个带超时时间的乐观锁(状态字段替代常见的版本号字段作为判断是否被锁的标记)。 操作完消息队列再将消息状态改回已经发布成功并释放锁。对于上了锁但是进程中途挂掉的场景,我们可以通过定时任务,通过判断发送任务是否超时来进行解锁。

2.2 消息队列阶段

核心:从消息落盘主从模式两个维度考虑

可靠性等级 确认条件 可靠性特点 Kafka 对应配置/形态 RabbitMQ 对应配置/形态
最高 多节点持久化/多数派持久化后 confirm 同时抗单节点故障和重启/断电,可靠性最高 Kafka 常规不强调“每条消息物理 fsync 后确认”,更推荐用副本可靠性逼近:replication.factor >= 3min.insync.replicas >= 2,producer acks=all/-1enable.idempotence=trueunclean.leader.election.enable=false。如果强制追求落盘,可调 log.flush.*,但性能代价大,非主流推荐。 Quorum Queuex-queue-type=quorum,queue durable=true,消息 delivery_mode=2/persistent,开启 publisher confirm。语义接近 Raft 多数派复制/提交后确认。
很高 多节点入内存/页缓存后 confirm 抗单节点崩溃/故障转移,但不抗集群整体掉电 Kafka 主流高可靠形态更接近这一档replication.factor >= 3min.insync.replicas >= 2,producer acks=all/-1enable.idempotence=trueunclean.leader.election.enable=false。leader 等 ISR 副本写入本地日志/页缓存后确认,默认不保证每条消息都 fsync 到物理盘。 Classic Mirrored Queue 镜像队列:durable queue + persistent message + publisher confirm 时,会等待同步镜像确认;但经典镜像队列已废弃/不推荐。若是非持久化消息,则更明确属于“多节点入内存后确认”。
主节点落盘后 confirm 主节点重启/断电后消息不会丢,但是主备倒换的过程中有可能导致丢消息 Kafka 不太典型。近似是:replication.factor=1,producer acks=1,再通过 log.flush.interval.messages=1 或较小 log.flush.interval.ms 逼近单副本落盘确认;但这不是 Kafka 推荐用法。 单节点 durable classic queue:queue durable=true,消息 delivery_mode=2/persistent,开启 publisher confirm。能抗原主重启,但不抗切到未同步副本。
主节点入内存/页缓存后 confirm 只要主节点进程挂/机器断电就可能丢 acks=1 + replication.factor=1,或者虽然有副本但 producer 用 acks=1,只等 leader 本地接收,不等 follower。acks=0 比这一档更低。 单节点 transient classic queue:queue durable=false 或消息 delivery_mode=1,开启 publisher confirm。若不开 publisher confirm,则可靠性更低。

多节点持久化/多数派持久化后 confirm 的可靠性最高,但如果要求每条消息都同步等待多个节点完成强持久化,会显著增加写入延迟和 IO 压力,容易让 MQ 成为系统瓶颈。因此实践中很多 MQ 会通过副本复制、页缓存、批量刷盘、WAL/group commit、ISR 或多数派提交等机制,在可靠性、吞吐和延迟之间折中,而不是简单采用“每条消息逐条多节点落盘后再确认”的严格模式。

个人认为多节点入内存后 confirm的可靠性是要略高于主节点落盘后 confirm的,在云上环境由于集群节点一般分布于不同的AZ且存在UPS,除非遇到大规模的自然灾害,否则很难出现集群整体无预兆的掉电

2.3 消息消费阶段

消费阶段最重要的任务其实是 保证幂等,有些业务操作是天然幂等的,这种情况下就不用再消费端通过各种手段保证幂等了。

对于非天然幂等的操作,需求决定设计。不妨想想:如果消费者想要保证幂等,ta 需要知道哪些基本信息。

如下两个点应该是比较容易想到的:
1.是否有消费者处理过同一条的消息?
2.我怎么知道当前处理的消息和之前处理的消息是不是同一条。

因此最简单的方案莫过用redis的setnx设置一个短过期时间的key,作为代表消息已经开始消费的记录,将消息的唯一ID(最好是写入在outbox表时就确定唯一的ID)作为key,UUID 作为 value,保证在key过期时间内即使由其他消费者接受到了重复消息,也能通过判断唯一 ID 的 key 是否存在,来判断该消息是否已经被人处理了;并且在业务代码执行完成后,设置一个超时时间较长的、代表消息消费状态为完成的 key,防止后续有重复消息进入。

其实在稍微能容忍消息重复消费的场景,这种方案已经够用了。

这种方案的核心问题是:不能通过事务来保证 redis和后续业务代码对数据库操作的原子性,可能会出现这种情况:

  • 相对能接受的情况:
    在完成业务操作前,如果进程挂掉了的话,那么消息队列会向消费者再推送消息;这种情况下,要等key过期后,业务代码才会被正确执行

  • 几乎确定会被重复消费的情况:
    在完成业务操作后,ack前,如果进程挂掉了的话,那么消息队列会向消费者再推送消息;等到key过期后,执行业务代码就会被重复执行

所以不难看出:Redis 的 PROCESSING TTL 调长调短,只是在“短时间重复消费拦截能力”和“失败后自动恢复速度”之间取舍;它不能解决业务成功但 Redis 状态未同步时的最终重复执行问题。

  • 长 TTL:更能挡住短时间重复投递,但失败恢复更慢。
  • 短 TTL:恢复更快,但更早暴露重复执行窗口。
  • 无论长短:都不能解决业务成功但 Redis 状态未同步时的最终重复执行问题。

因此,如果是对消息重复消费容忍度很低的场景,最通用的方法还是通过inbox表来解决问题

类比redis的设计,inbox表我们至少应该有
数据库主键,消息ID,被占用消费者ID,锁过期时间 这几个字段

CREATE TABLE inbox_messages (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,

  -- 对应 Redis key:消息或业务的唯一幂等标识
  message_id VARCHAR(128) NOT NULL,

  -- 对应 status:PROCESSING / PROCESSED
  status VARCHAR(32) NOT NULL,

  -- 对应“谁 SETNX 成功了”:当前占用这条消息的消费者
  locked_by VARCHAR(128) NULL,

  -- 对应 Redis PROCESSING TTL:锁什么时候过期
  lock_expires_at DATETIME NULL,

  UNIQUE KEY uk_inbox_message_id (message_id),
  KEY idx_inbox_status_lock_expires_at (status, lock_expires_at)
);

加上一些审计字段后变成(由mq驱动重试,而不是由inbox表驱动重试,记录inbox表+执行业务代码完成后再ACK):

CREATE TABLE inbox_messages (
  id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT '数据库自增主键,无业务含义',

  message_id VARCHAR(128) NOT NULL COMMENT '消息唯一标识/业务幂等键;用于识别同一条消息,防止重复消费',
  status VARCHAR(32) NOT NULL COMMENT '消息处理状态:PROCESSING=处理中,PROCESSED=已处理;也可扩展 FAILED/DEAD 等状态',

  locked_by VARCHAR(128) NULL COMMENT '当前占用该消息的消费者标识,用于排查是哪一个消费者正在处理',
  claim_token VARCHAR(128) NULL COMMENT '本次认领消息生成的随机令牌;完成处理时需带 token 更新,防止旧消费者超时后误提交',
  lock_expires_at DATETIME NULL COMMENT '处理锁过期时间;超过该时间后,其他消费者可尝试重新认领',

  attempt_count INT NOT NULL DEFAULT 0 COMMENT '本消费者实际尝试处理该消息的次数;主要用于观测和毒消息保护,不负责调度重试',
  last_error TEXT NULL COMMENT '最近一次处理失败的错误信息,用于排障;实际重试节奏由 MQ 控制',

  created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间,即第一次收到该 message_id 的时间',
  updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间,即最近一次状态、锁或错误信息变化时间',
  processed_at DATETIME NULL COMMENT '消息成功处理完成时间;仅在 PROCESSED 状态下通常有值',

  UNIQUE KEY uk_inbox_message_id (message_id),
  KEY idx_inbox_status_lock_expires (status, lock_expires_at)
) COMMENT='Inbox 消息消费幂等表;在 MQ 驱动重试模式下,用于记录消费状态、处理锁、尝试次数和基础排障信息';

mq驱动的消费流程大概是这样:

  1. 收到 MQ 消息
  2. 根据 message_id 尝试写入/认领 inbox_messages
  3. 如果已经是 PROCESSED:说明重复投递,直接 ACK
  4. 如果成功认领:执行业务逻辑
  5. 在同一个数据库事务里:提交业务变更 + 标记 Inbox 为 PROCESSED
  6. commit 成功后,再 ACK MQ
  7. 如果业务失败或 commit 失败:不 ACK / NACK,让 MQ 后续重投

要注意这里我们达成的幂等,只是对于“拥有相同message_id的重复消息“的幂等,如果同一个业务操作,导致了在生产端产生了多条message_id不同的消息,那么在inbox表是不具备判断其是否为”同一条“消息的能力,这种情况下,还是要结合业务,比如业务代码的流水表里通过构造唯一键来达成幂等。

也就是说:Inbox 解决的是“同一条消息重复来”的问题;业务唯一键解决的是“同一件业务事情被包装成多条消息来”的问题

2.4 死信队列

死信队列(DLQ,Dead Letter Queue)可以理解为异常消息的隔离区。它不是 MQ 可靠性的核心设计,而是消费端异常处理的兜底手段:当一条消息反复消费失败,达到最大重试次数后,就不应该继续无限重试,而应该被转入死信队列,或者在 inbox 表中被标记为 DEAD。

这样做的目的,是避免“毒消息”一直卡住正常消费链路。所谓毒消息,就是那些由于消息格式错误、业务数据异常、代码缺陷等原因,短时间内无论重试多少次都无法被正常处理的消息。

一般流程可以简化为:

  1. 消费者处理消息失败;
  2. 记录失败原因,并增加重试次数;
  3. 未达到最大重试次数时,由 MQ 或延迟重试机制继续投递;
  4. 达到最大重试次数后,将消息转入 DLQ,或将 inbox 状态标记为 DEAD;
  5. 触发告警,等待人工排查、修复数据、重新投递或执行补偿逻辑。

需要注意的是,DLQ 本身并不负责“修好”消息,它只是把异常消息从主消费链路中隔离出来。真正的后续处理,仍然依赖监控告警、错误原因记录、人工排查和补偿机制。

因此,在高可靠消费设计中,可以把死信队列看作最后一道保护:正常情况下依靠 inbox + 业务幂等保证重复消费安全;异常情况下依靠最大重试次数 + DLQ / DEAD 状态,防止毒消息无限重试并影响正常消息处理。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐