1.为什么需要使用消息队列?

  • 化同步为异步;提升用户体验,比如创建虚拟机场景,如果创建虚拟机的请求要等到虚拟机创建完成才返回,那分钟级的时延会用户体验很差,更好的解决方案是立刻返回:创建指令下发成功的提示,通过轮询或者回调来通知用户最终创建结果
  • 解耦;从时间层面来讲,可以避免某个低效的操作成为整个系统的瓶颈——低性能的操作可以通过单独抽取成一个微服务+加机器来处理;从空间层面来讲,如果业务比较复杂,可以将业务流程拆成多个微服务,微服务间通过事件而不是API进行通讯,这样各个微服务的横向拓展就会容易很多,因为其只需要更新MQ中的微服务配置,而不是对所有相关微服务都修改地址配置,配置修改代价从O(N) - > O(1)
  • 削峰填谷,避免系统被瞬时大流量请求打挂,给系统一个可以按需处理“缓冲区”;比如抢票场景

2.如何保证消息不丢

2.1 消息生产阶段

关键节点:意图落盘 -> publisher扫描意图表 -> 消息推送

意图落盘

为什么要引入"意图"这个概念,而不是直接推送消息?
如果是个人小项目或者对消息可靠性要求没那么高的场景,其实直接推送消息就行了;
但如果业务要求不能有任何消息遗漏呢,比如

一个订单系统通过消息队列通知下游积分系统“增加积分”,要求消息绝对不能丢

没仔细研究过的我:这题我会!为了防止各种神秘小故障导致 在订单状态被修改成已下单后,消息没推送成功前进程就挂掉了,所以应该使用事务来保证数据库中的订单状态和实际上的积分情况一致。应该使用事务在推送之前修改一下订单表为已下单,再推送加积分消息,比如:

def create_order_with_message():
    try:
        # 1. 开启事务
        db_conn.begin()
        
        # 2. 插入订单记录,状态为"下单完成"
        order_sql = """
        INSERT INTO orders (user_id, amount, status, created_at) 
        VALUES (%s, %s, %s, NOW())
        """
        cursor.execute(order_sql, (1001, 199.99, '下单完成'))
        order_id = cursor.lastrowid
        
        # 3. 推送积分消息到MQ
        message = f'{{"order_id": {order_id}, "points": 200}}'
        channel.basic_publish(
            exchange='',
            routing_key='order_points',
            body=message,
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化消息
            )
        )
        logger.info(f"订单 {order_id} 积分消息已推送")
        
        # 4. 提交事务
        db_conn.commit()
        logger.info(f"订单 {order_id} 创建成功,事务已提交")
        
    except Exception as e:
        # 发生异常,回滚事务
        db_conn.rollback()
        logger.error(f"订单创建失败,事务已回滚: {e}")
        raise

细究的话可以直接把这段代码发给AI,问:

忽略代码中的语法错误,以伪代码的视角来看,这段有什么问题

通过事务来保证 “订单状态改变为已下单”和"发送积分"这两件事的一致性的思路本身是没错的,但两个最核心的问题在与:

问题1.MQ没有回滚能力,db的事务管理不了MQ的发送,也就是说如果捕获到了异常,仅有DB相关的操作可以正常回滚

问题2.推送消息到MQ作为一个相对重载的操作,会大幅延长事务的持续时间,这种行为有个贴切的俗语——“占着茅坑不拉屎”

此时,“意图”这个概念就呼之欲出了——反正由于消息队列的存在,积分增加和用户下单两个操作必然会存在短暂的不一致性(即存在时间差),我们索性在下单完成时只在数据库中把“我要加积分”这个操作的“意图"存下来。

后续的发消息操作可以单独用一个定时任务来做,专门负责发送消息的这个对象通常叫做publisher

publisher扫描意图表

常见的仅有单个publisher的意图表(通常又叫outbox表)设计:

CREATE TABLE outbox_messages (
    -- Outbox 消息自己的主键,用来唯一标识一条待发布消息
    id INTEGER PRIMARY KEY AUTOINCREMENT,

    -- 这条消息属于哪一类业务对象
    -- 例如:sms_message、order、payment
    aggregate_type TEXT NOT NULL,

    -- 这条消息对应的具体业务对象 ID
    -- 例如:短信任务 ID、订单 ID
    aggregate_id TEXT NOT NULL,

    -- 事件类型,表示这条消息描述了什么业务事件
    -- 例如:SMS_TASK_CREATED、ORDER_PAID、STOCK_DECREASED
    event_type TEXT NOT NULL,

    -- 要发送到消息队列的消息内容,通常存 JSON 字符串
    -- 例如:{"sms_id": 1, "phone": "13800000000", "content": "购买成功"}
    payload TEXT NOT NULL,

    -- 当前发布状态
    -- PENDING:等待发布
    -- PUBLISHED:已经发布成功
    -- FAILED:发布失败,但可能还能重试
    -- DEAD:可选,到达最大重试次数,发布失败
    status TEXT NOT NULL DEFAULT 'PENDING',

    -- 已经重试发布的次数
    -- 每次发布失败后 +1
    retry_count INTEGER NOT NULL DEFAULT 0,

    -- 最大重试次数
    -- retry_count 达到这个值后,后台扫描程序可以不再自动发布它
    max_retries INTEGER NOT NULL DEFAULT 3,

    -- 最近一次发布失败的错误信息
    -- 用来排查 RabbitMQ 连接失败、网络超时等问题
    last_error TEXT NULL,

    -- 这条 outbox 消息创建的时间
    -- 通常和业务数据在同一个事务里一起写入
    created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,

    -- 成功发布到消息队列的时间
    -- 还没发布成功时为 NULL
    published_at DATETIME NULL
);

消息推送

常见的publisher推送一条消息到队列中的流程大概是这样:

  • 查询outbox表,找出id为outbox_id的记录
  • 校验状态,如果已经发送完成或者到达最大重视次数,就直接return
  • 数据格式转化,将数据库中的json格式的消息转化为消息队列格式的消息
  • 将消息推送到消息队列
  • 若成功,修改outbox表id为outbox_id的记录状态为已经推送
  • 若失败,修改outbox表的记录的重试次数,如果重试次数已经到达上限,则将状态修改为发送失败

假设消息队列为rabbitmq,参考代码

def publish_outbox_message(
    connection: Any,
    outbox_id: int,
    publisher: SmsTaskPublisher,
    *,
    max_retries: int = DEFAULT_OUTBOX_MAX_RETRIES,
) -> SmsTask:
    outbox = execute(
        connection,
        f"""
        SELECT id, payload, status, retry_count
        FROM outbox_messages
        WHERE id = {placeholder(connection)}
        """,
        (outbox_id,),
    ).fetchone()
    if outbox is None:
        raise SmsQueueError(f"Outbox 消息不存在:{outbox_id}")

    task = sms_task_from_outbox_payload(outbox["payload"])
    if outbox["status"] == OUTBOX_STATUS_PUBLISHED:
        return task

    if outbox["retry_count"] >= max_retries:
        with transaction(connection):
            execute(
                connection,
                f"""
                UPDATE outbox_messages
                SET status = {placeholder(connection)}
                WHERE id = {placeholder(connection)}
                """,
                (OUTBOX_STATUS_FAILED, outbox_id),
            )
        return task

    try:
        publisher(task)
    except Exception as error:
        new_retry_count = outbox["retry_count"] + 1
        with transaction(connection):
            execute(
                connection,
                f"""
                UPDATE outbox_messages
                SET status = {placeholder(connection)},
                    retry_count = {placeholder(connection)},
                    last_error = {placeholder(connection)}
                WHERE id = {placeholder(connection)}
                """,
                (OUTBOX_STATUS_FAILED, new_retry_count, str(error), outbox_id),
            )
        logger.exception("Outbox 消息发布失败:outbox_id=%s", outbox_id)
        raise SmsQueueError(f"Outbox 消息发布失败:{error}") from error

    with transaction(connection):
        execute(
            connection,
            f"""
            UPDATE outbox_messages
            SET status = {placeholder(connection)},
                published_at = CURRENT_TIMESTAMP,
                last_error = NULL
            WHERE id = {placeholder(connection)}
            """,
            (OUTBOX_STATUS_PUBLISHED, outbox_id),
        )

    return task

上面的代码已经其实还有几个潜在问题:
1.最重要的一个,默认publisher(task) 未报错即代表消息已经推送到了消息队列中,但是实际上神秘小故障有可能导致实际上消息队列并没有成功收到(具体故障可以问AI),因此我们需要使用消息队列自带的Publisher Confirm机制,来确保消息队列确实收到了消息,再修改DB状态。

2.如果发送消息成功,但是由于神秘小故障,导致进程在修改outbox表的状态前就挂掉了,会导致下次publisher再次扫描该表,导致同一个消息被多次生产,这个问题作者目前没找到比较轻量的解决方案,所以我们只能退而求其次,在生产端保证消息绝对不漏+尽可能保证消息不重复(但是实际上很难做到),对于重复消息,在消费端构造幂等性

3.如果有多个publisher同时执行消息生产工作,那么有可能会出现同一个消息被并发投递两次的情况,虽然消费端构造幂等性能一定程度上缓解这个问题,但是实践中我们可以通过给outbox表添加 locked_at 和 locaked_by 两个字段,将原本的 查询outbox表中id为xx 这个操作纯查询操作 改为 将outbox表中id为xx 的 状态改为 publishing(发送中) ,同时将 locked_at 和 locked_by 字段分别改为 锁超时时间 和 含有UUID(防止锁被误删)的字符串,这样就实现了一个带超时时间的乐观锁(状态字段替代常见的版本号字段作为判断是否被锁的标记)。 操作完消息队列再将消息状态改回已经发布成功并释放锁。对于上了锁但是进程中途挂掉的场景,我们可以通过定时任务,通过判断发送任务是否超时来进行解锁。

2.2 消息队列阶段

核心:从消息落盘主从模式两个维度考虑

可靠性等级 确认条件 可靠性特点 Kafka 对应配置/形态 RabbitMQ 对应配置/形态
最高 多节点持久化/多数派持久化后 confirm 同时抗单节点故障和重启/断电,可靠性最高 Kafka 常规不强调“每条消息物理 fsync 后确认”,更推荐用副本可靠性逼近:replication.factor >= 3min.insync.replicas >= 2,producer acks=all/-1enable.idempotence=trueunclean.leader.election.enable=false。如果强制追求落盘,可调 log.flush.*,但性能代价大,非主流推荐。 Quorum Queuex-queue-type=quorum,queue durable=true,消息 delivery_mode=2/persistent,开启 publisher confirm。语义接近 Raft 多数派复制/提交后确认。
很高 多节点入内存/页缓存后 confirm 抗单节点崩溃/故障转移,但不抗集群整体掉电 Kafka 主流高可靠形态更接近这一档replication.factor >= 3min.insync.replicas >= 2,producer acks=all/-1enable.idempotence=trueunclean.leader.election.enable=false。leader 等 ISR 副本写入本地日志/页缓存后确认,默认不保证每条消息都 fsync 到物理盘。 Classic Mirrored Queue 镜像队列:durable queue + persistent message + publisher confirm 时,会等待同步镜像确认;但经典镜像队列已废弃/不推荐。若是非持久化消息,则更明确属于“多节点入内存后确认”。
主节点落盘后 confirm 主节点重启/断电后消息不会丢,但是主备倒换的过程中有可能导致丢消息 Kafka 不太典型。近似是:replication.factor=1,producer acks=1,再通过 log.flush.interval.messages=1 或较小 log.flush.interval.ms 逼近单副本落盘确认;但这不是 Kafka 推荐用法。 单节点 durable classic queue:queue durable=true,消息 delivery_mode=2/persistent,开启 publisher confirm。能抗原主重启,但不抗切到未同步副本。
主节点入内存/页缓存后 confirm 只要主节点进程挂/机器断电就可能丢 acks=1 + replication.factor=1,或者虽然有副本但 producer 用 acks=1,只等 leader 本地接收,不等 follower。acks=0 比这一档更低。 单节点 transient classic queue:queue durable=false 或消息 delivery_mode=1,开启 publisher confirm。若不开 publisher confirm,则可靠性更低。

多节点持久化/多数派持久化后 confirm 的可靠性最高,但如果要求每条消息都同步等待多个节点完成强持久化,会显著增加写入延迟和 IO 压力,容易让 MQ 成为系统瓶颈。因此实践中很多 MQ 会通过副本复制、页缓存、批量刷盘、WAL/group commit、ISR 或多数派提交等机制,在可靠性、吞吐和延迟之间折中,而不是简单采用“每条消息逐条多节点落盘后再确认”的严格模式。

个人认为多节点入内存后 confirm的可靠性是要略高于主节点落盘后 confirm的,在云上环境由于集群节点一般分布于不同的AZ且存在UPS,除非遇到大规模的自然灾害,否则很难出现集群整体无预兆的掉电

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐