一、RabbitMQ 核心知识点

1.1 什么是 MQ

MQ(Message Queue,消息队列),消息中间件,是存储消息的容器,用于分布式系统之间进行通信,实现系统解耦、异步通信、流量削峰等核心能力。

1.2 主流 MQ 对比选型

特性 RabbitMQ ActiveMQ RocketMQ Kafka
公司 / 社区 Rabbit Apache 阿里 Apache
开发语言 Erlang Java Java Scala&Java
协议 AMQP OpenWire、AUTO、Stomp、MQTT 自定义 自定义
单机吞吐量 万级 万级 (最差) 十万级 十万级
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
特性 并发能力强,延时低 老牌产品,文档多 功能完备,扩展性佳 主打大数据日志采集,仅支持核心 MQ 功能

选型建议

  • 中小型公司:首选 RabbitMQ(高并发、社区活跃、功能完备);
  • 大型公司:根据场景选择 RocketMQ(业务消息)或 Kafka(日志采集)。

1.3 RabbitMQ 基础

RabbitMQ 是采用 Erlang 语言开发实现 AMQP(高级消息队列)协议的消息中间件。

RabbitMQ的实现类:
            connection:连接,京港澳高速路
            channel:信道,行车道之一,封装了大部分的API
            queue:存储消息的容器
            exchange:分发消息

AMQP 协议组成

AMQP 协议定义了消息传递的规范,核心包含:

  • 生产者:发送消息的应用程序;
  • 交换机(Exchange):路由消息到队列;
  • 队列(Queue):存储消息的缓冲区;
  • 消费者:接收并处理消息的应用程序;
  • 绑定(Binding):交换机与队列的关联规则。

RabbitMQ的启动器
        spring-boot-starter-amqp

1.4 为什么使用MQ(RabbitMQ核心价值)

(1)解耦

传统模式:系统 A 直接调用系统 B/C 代码,新增系统 D 需修改 A 的代码;

MQ 模式:系统 A 将消息写入 MQ,其他系统从 MQ 订阅消息,A 无需修改代码。

(2)异步

传统模式:非核心业务同步执行,响应慢;

MQ 模式:非核心业务通过 MQ 异步执行,提升响应速度。

A服务把消息发送到MQ,B、C服务接受消息后以异步方式运行

(3)削峰

传统模式:高并发请求直接冲击数据库,导致连接异常;

MQ 模式:请求先写入 MQ,系统按数据库处理能力从 MQ 慢慢拉取消息,平稳消费。

1.5 RabbitMQ 安装(CentOS)

1.5.1 克隆 CentOS 虚拟机(可选)

通过 VMware 克隆 CentOS 虚拟机,作为 RabbitMQ 部署节点。

1.5.2 安装 Erlang(RabbitMQ 依赖)

上传安装包

bash

# 依次执行
rpm -ivh esl-erlang-17.3-1.x86_64.rpm --force --nodeps
rpm -ivh esl-erlang_17.3-1~centos~6_amd64.rpm --force --nodeps
rpm -ivh esl-erlang-compat-R14B-1.el6.noarch.rpm --force --nodeps
1.5.3 安装 RabbitMQ

bash

​
# 安装包
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm

# 启动/停止/重启/状态
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
service rabbitmq-server status

# 开机自启
chkconfig rabbitmq-server on

# 防火墙开放15672端口(管理界面)
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save

# 开启web管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart

# 创建管理员账号
rabbitmqctl  add_user admin 1111
rabbitmqctl  set_user_tags admin  administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

# 查看用户
rabbitmqctl list_users

​
1.5.4 访问管理界面

浏览器访问:http://服务器IP:15672,使用 admin/1111 登录。

1.6 RabbitMQ 工程搭建

1.6.1 创建工程

新建 SpringBoot 工程springboot_rabbitmq

1.6.2 pom.xml 依赖

xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
    </parent>
    <groupId>com.powershop</groupId>
    <artifactId>springboot_rabbitmq</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
</project>
1.6.3 连接工具类

java

package com.powershop.util;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
    /**
     * 建立与RabbitMQ的连接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.233.132");
        //端口
        factory.setPort(5672);
        //设置账号信息
        factory.setUsername("admin");
        factory.setPassword("1111");
        factory.setVirtualHost("/");
        // 获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

二、RabbitMQ 五种消息模型

RabbitMQ 提供 5 种核心消息模型(排除 RPC),其中 3/4/5 属于订阅模型,仅路由方式不同。

1 Simple - 简单模型

流程图

核心角色

  • P(生产者):发送消息;
  • C(消费者):接收消息;
  • 队列:存储消息的缓冲区。

一对一:一个生产者对一个消费者

(1) 生产者发送消息

java

import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQSender {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、获取连接  Connection:京港澳高速路
        Connection connection = ConnectionUtil.getConnection();
        //2、创建信道 Channel:行车道之一,封装了大部分API
        Channel channel = connection.createChannel();
        //3、创建队列 Queue
        String QUEUE_NAME = "simple_queue";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4、发送消息
        String msg = "kskbl";
        /**
         * String queue : 队列名
         * boolean autoAck: 是否自动Ack,MQ的消息复制到消费者,消费者接受后,Ack返回到MQ才会删除消息
         * Consumer callback:返回后的业务逻辑
         */
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("sendMSG:" + msg);
        //5、关闭
        //channel.close();
        //connection.close();

    }
}
(2) 消费者接收消息

java

import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、获取连接  Connection:京港澳高速路
        Connection connection = ConnectionUtil.getConnection();
        //2、创建信道 Channel:行车道之一,封装了大部分API
        Channel channel = connection.createChannel();
        //3、创建队列 Queue,若Queue已存在,则不创建
        String QUEUE_NAME = "simple_queue";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4、接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println("consumer:" + msg);
            }
        };
        /**
         * String queue : 队列名
         * boolean autoAck: 是否自动Ack,MQ的消息复制到消费者,消费者接受后,Ack返回到MQ才会删除消息
         * Consumer callback:返回后的业务逻辑
         */
        channel.basicConsume(QUEUE_NAME,true,consumer);
        //5、关闭
        channel.close();
        connection.close();
    }
}

说明:消费者启动后会持续监听队列,有新消息立即消费,消费后消息从队列移除。

(3) 手动ACK
  • 自动ACK:消息一旦被接收,消费者自动发送ACK

  • 手动ACK:消息接收后,不会发送ACK,需要手动调用

自动ACK问题:如果消费者领取消息后,还没执行操作就程序报错?

消息消费失败,但是消息依然被消费,RabbitMQ无从得知,这样消息就丢失了!

生产者不做任何修改,直接运行,消息发送成功:

消费者,添加异常,运行消费者,程序抛出异常。但是消息依然被消费:

我们之前的测试都是默认自动ACK的,如果要手动ACK,需要改动我们的代码:

channel.basicConsume(, false, );                     //默认自动ACK,false取消默认

channel.basicAck(envelope.getDeliveryTag(), false);     // true改为false,开启手动ACK

import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、获取连接  Connection:京港澳高速路
        Connection connection = ConnectionUtil.getConnection();
        //2、创建信道 Channel:行车道之一,封装了大部分API
        Channel channel = connection.createChannel();
        //3、创建队列 Queue,若Queue已存在,则不创建
        String QUEUE_NAME = "simple_queue";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //4、接受消息
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer:" + msg);
                    //模拟业务错误
                    //int a = 6/0;
                    //手动 ACK
                    /**
                     * long deliveryTag  在消息队列中的位置下标
                     * boolean multiple   是否批量处理多个消息,如果为true,比下标小的一块儿处理
                     */
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        /**
         * String queue : 队列名
         * boolean autoAck: 是否 自动Ack,MQ的消息复制到消费者,消费者接受后,Ack返回到MQ才会删除消息
         * Consumer callback:返回后的业务逻辑
         */
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //5、关闭
        //channel.close();
        //connection.close();
    }
}

生产者不变,再次运行:

2 Work-消息模型

能者多劳型,特点:多个消费者

(1) 生产者与案例1相似
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQSender {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        String QUEUE_NAME = "simple_queue";

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for (int i = 0; i < 50; i++) {
            String msg = "kskbl" + i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
            System.out.println("sendMSG:" + i + " "+ msg);
        }
        //channel.close();
        //connection.close();
    }
}
(2) 消费者1

模拟消费者1快,消费者2比较慢的情况

import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "simple_queue";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer1:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}
(3) 消费者2
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "simple_queue";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer2:" + msg);
                    //模拟业务耗时
                    Thread.sleep(1000);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}

两个消费者一同启动,然后消费50条消息

(4) Simple简单模型 问题:

两个消费者各自消费了25条消息,但是消费者1明显快于消费者2,任务的总耗时较长(如果将消息都给消费者1,总耗时大大缩短)

(5) Work 消息模型:

消费者接受消息前添加:
            channel.basicQos(1);    //一个消息只能被一个消费者获取,每个消费者同时只能处理一条消息

测试:


问题:
            如何防止消息堆积?       多个consumer+能者多劳

3 Fanout消息模型

Fanout,也称为广播。特点:加入了交换机Exchange

  • 每个消费者有自己的queue(队列)

  • 每个队列都要绑定到Exchange(交换机)

  • 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

(1) 生产者

创建交换机Exchange,不再声明Queue

import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.amqp.core.ExchangeTypes;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQSender {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "FANOUT_EXCHANGE";
        //创建交换机Exchange:分发消息
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
        String msg = "kskbl";
        channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
        System.out.println("sendMSG:"+ msg);
        //channel.close();
        //connection.close();
    }
}
(2) 消费者1

队列Queue需要和交换机绑定

import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "fanout_queue_1";
        String EXCHANGE_NAME = "FANOUT_EXCHANGE";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer1:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);

                    channel.basicQos(1);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };

        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}
(3) 消费者2
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class MQConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "fanout_queue_2";
        String EXCHANGE_NAME = "FANOUT_EXCHANGE";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //绑定队列交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer2:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    channel.basicQos(1);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}

 注意:Exchange只负责分发消息,若没有Queue绑定到Exchange上则消息会被丢弃

效果:

4 Direct消息模型

定向消息匹配

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

(1) 生产者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectSender {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "DIRECT_EXCHANGE";
        // 声明exchange,指定类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
        String msg = "KsKbl-D";
        // 发送消息,并且指定routing key 为:error
        channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes());
        System.out.println("sendMSG:"+ msg);

        channel.close();
        connection.close();
    }
}
(2) 消费者1
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DirectConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "direct_queue_1";
        String EXCHANGE_NAME = "DIRECT_EXCHANGE";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 绑定队列到交换机,同时指定需要订阅的routing key。
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer1:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}
(3) 消费者2
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "direct_queue_2";
        String EXCHANGE_NAME = "DIRECT_EXCHANGE";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 绑定队列到交换机,同时指定需要订阅的routing key。
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer2:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);

                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}

效果:

消费者1只能接受error消息,消费者2能接受error、info、warning消息

5 Topic消息模型

也叫话题消息模型,模糊匹配

Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

通配符规则:
            * :匹配1个单词
            # :匹配n个单词

举例:

audit.# :能够匹配audit.irs.corporate 或者 audit.irs
audit.* :只能匹配audit.irs
(1) 生产者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicSender {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "TOPIC_EXCHANGE";
        // 声明exchange,指定类型为TOPIC
        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);
        String msg = "KsKbl-";
        // 发送消息,并且指定routing key
        channel.basicPublish(EXCHANGE_NAME,"item.insert",null,msg.getBytes());
        System.out.println("sendMSG:"+ msg);

        channel.close();
        connection.close();
    }
}
(2) 消费者1
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "topic_queue_1";
        String EXCHANGE_NAME = "TOPIC_EXCHANGE";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 绑定队列到交换机,同时指定需要订阅的routing key。
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"user.*");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer1:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}
(3) 消费者2
public class TopicConsumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "topic_queue_2";
        String EXCHANGE_NAME = "TOPIC_EXCHANGE";
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        // 绑定队列到交换机,同时指定需要订阅的routing key。
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.*");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer2:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}

效果:

消费者1接受 user.* 消息;消费者2接受 item.* 消息

三、持久化

3.1 为什么要持久化

如何避免消息丢失,保证消费者成功消费?(理论上)

  1.  消费者的手动ACK机制,可以防止业务处理失败。
  2.  但是,如果在消费者消费之前,MQ就宕机了,消息就没了,需要持久化

3.2 持久化步骤

队列Queue、Exchange、消息 都需要持久化

1、Exchange 持久化
         channel.exchangeDeclare(, , true);
2、Queue 持久化
         channel.queueDeclare(, true, , , );
3、消息 持久化
         channel.basicPublish(, , MessageProperties.PERSISTENT_TEXT_PLAIN,);

(1) 生产者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableSender {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String EXCHANGE_NAME = "DURABLE_EXCHANGE";
        
        for (int i = 0; i < 50; i++) {
            channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC,true);  //true则开启Exchange持久化
            String msg = "kskbl-" + i;
            channel.basicPublish(EXCHANGE_NAME,"item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());    //开启消息持久化
            System.out.println("sendMSG:"+ msg);
        }
        channel.close();
        connection.close();
    }
}
(2) 消费者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableConsumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String QUEUE_NAME = "durable_queue_1";
        String EXCHANGE_NAME = "DURABLE_EXCHANGE";
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);  //true则开启Queue队列持久化
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.*");

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    String msg = new String(body);
                    System.out.println("consumer1:" + msg);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,false,consumer);
        //channel.close();
        //connection.close();
    }
}

效果:

D 表示已经持久化

四、SpringBoot整合RabbitMQ

4.1  依赖和配置

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
</dependency>

application.yml

spring:
          rabbitmq:
            host: 192.168.61.137
            port: 5672
            username: admin
            password: 1111
            virtual-host: /

4.2 消费/监听者

@Component
        public class Recver {

            /**
             * 接收消息的三个要素:
             *  1、exchange
             *  2、queue
             *  3、routingKey
             */
            @RabbitListener(bindings = {@QueueBinding(
                    value = @Queue(value = "springboot_queue"),
                    exchange = @Exchange(value = "springboot_exchange", type = ExchangeTypes.TOPIC),
                    key = {"springboot.*"}
            )})
            public void listenerMsg(String msg) {
                System.out.println("Recver:" + msg);
            }
        }
  • @Componet:类上的注解,注册到Spring容器

  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:

    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

      • value:这个消费者关联的队列。值是@Queue,代表一个队列

      • exchange:队列所绑定的交换机,值是@Exchange类型

      • key:队列和交换机绑定的RoutingKey

类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

4.3 发送者

import com.powershop.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {Application.class})
public class sender {
    @Autowired
    private AmqpTemplate amqpTemplate;
    @Test
    public void testSendMsg() {
        String msg = "世界 MC!";
        amqpTemplate.convertAndSend("springboot_exchange","springboot.test",msg);
        System.out.println("Sender:" + msg);
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4.4 启动器

package com.powershop;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

4.5 设置手动ack

application.yml

#设置三种订阅模式手动ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#设置work消息类型手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

注意:监听方法添加Channel channel, Message message两个参数

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Recv {
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue(value = "springboot_queue"),
            exchange = @Exchange(value = "springboot_exchange", type = ExchangeTypes.TOPIC),
            key = {"springboot.*"}
    )})
    public void MQListener(String msg, Channel channel, Message message){
        /**
         * 监听者接受消息三要素:
         * 1、Exchange
         * 2、Queue
         * 3、Routing Key
         */
        try {
            //int a = 6/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("接收到消息:" + msg);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

运行效果:

五、索引库同步

需求:后台新增商品后,需要将商品数据实时同步到 Elasticsearch 索引库,否则新增的商品ES查询不到,下面对比三种实现方案:

5.1 方案对比

方案一:同服务同步(强耦合)
  • 逻辑:在power_shop_item新增商品的业务中,直接添加同步索引库的代码。
  • 缺点:业务耦合度高,商品维护和索引维护绑定在一起,扩展性差。
方案二:服务间调用(紧耦合)
  • 逻辑:power_shop_item新增商品后,直接调用power_shop_search的同步接口。
  • 缺点:系统耦合性强,新增其他同步服务时,需修改power_shop_item代码。
方案三:RabbitMQ 异步同步(解耦)
  • 逻辑:power_shop_item新增商品后,发送消息到 RabbitMQ;power_shop_search监听消息,异步完成索引同步。
  • 优点:完全解耦、异步执行、不阻塞主业务、扩展性强。

5.2 索引库同步实现(RabbitMQ 版)

5.2.1 生产者:power_shop_item(发送消息)
(1)pom.xml 引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)application.yml 配置 RabbitMQ

yaml

spring:
  rabbitmq:
    host: 192.168.204.134
    username: admin
    password: 1111
    virtual-host: /
(3)Service 新增商品并发送消息

java

​@Service
public class ItemServiceImpl extends ServiceImpl<ItemMapper, Item> implements ItemService {
    @Autowired
    private AmqpTemplate amqpTemplate;
    ... 
    @Override
    public void insertTbItem(TbItem tbItem, String desc, String itemParams) {
    // 1. 生成商品ID,补齐商品基础数据
    Long itemId = IDUtils.genItemId();
    Date date = new Date();
    tbItem.setId(itemId);
    tbItem.setStatus((byte)1);
    tbItem.setUpdated(date);
    tbItem.setCreated(date);
    tbItemMapper.insertSelective(tbItem);

    // 2. 保存商品描述
    TbItemDesc tbItemDesc = new TbItemDesc();
    tbItemDesc.setItemId(itemId);
    tbItemDesc.setItemDesc(desc);
    tbItemDesc.setCreated(date);
    tbItemDesc.setUpdated(date);
    tbItemDescMapper.insertSelective(tbItemDesc);

    // 3. 保存商品规格参数
    TbItemParamItem tbItemParamItem = new TbItemParamItem();
    tbItemParamItem.setItemId(itemId);
    tbItemParamItem.setParamData(itemParams);
    tbItemParamItem.setUpdated(date);
    tbItemParamItem.setCreated(date);
    tbItemParamItemMapper.insertSelective(tbItemParamItem);

    // 4. 发送商品新增消息到RabbitMQ(交换机:item_exchage,路由键:item.add)
    amqpTemplate.convertAndSend("item_exchage","item.add", itemId);
    }

    @Override
    public Map selectSearchItemByItemId(Long itemId) {
        return itemMapper.selectSearchItemByItemId(itemId);
    }
}
(4)ItemController
    ...
    @RequestMapping("/selectSearchItemByItemId")
    public Map selectSearchItemByItemId(Long itemId){
        return itemService.selectSearchItemByItemId(itemId);
    };
(5)ItemMaper 按 ID 查询商品完整信息
...
    <select id="selectSearchItemByItemId" resultType="java.util.Map">
        SELECT
               t1.id, t1.title item_title,
               t1.sell_point item_sell_point,
               t1.price item_price,
               t1.image item_image,
               t2.name item_category_name,
               t3.item_desc item_desc
        FROM tb_item t1, tb_item_cat t2, tb_item_desc t3
        WHERE t1.cid=t2.id AND t1.id=t3.item_id AND t1.id = #{itemId}
    </select>
5.2.2 消费者:power_shop_search(监听/接受消息同步索引)
(1)pom.xml 引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)application.yml 配置 RabbitMQ

yaml

spring:
  rabbitmq:
    host: 192.168.204.134
    username: admin
    password: 1111
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #不自动ack
      direct:
        acknowledge-mode: manual
(3)Listener 监听 RabbitMQ 消息

java

package com.powershop.mq;
import com.powershop.service.SearchItemService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SearchMQRecv {
    @Autowired
    private SearchItemService searchItemService;

    //接受消息三要素  exchange   queue   routingKey
    @RabbitListener(bindings ={ @QueueBinding(
            value = @Queue(value = "item_queue"),
            exchange = @Exchange(value = "item_exchange",type = ExchangeTypes.TOPIC),
            key = {"item.add"}
    )})
    public void ListenerMsg(Long itemId, Channel channel , Message message){
        try {
            searchItemService.addItem(itemId);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
(4)Service 同步商品到 ES 索引库
  • SearchItemService
public interface SearchItemService {
    ...
    void addItem(Long itemId) throws IOException;
}
  • SearchItemServiceImpl
@Service
public class SearchItemServiceImpl implements SearchItemService {
    ...
 @Override
    public void addItem(Long itemId) throws IOException {
        //1、查询MySQL
        Map map = itemServiceFeign.selectSearchItemByItemId(itemId);
        //2、同步到索引库
        IndexRequest request = new IndexRequest();
        request.index("power_shop");
        request.type("item");
        request.source(JsonUtils.objectToJson(map),XContentType.JSON);
        restHighLevelClient.index(request);
    }
}

5.3 功能测试

  1. 后台(power_shop_item)新增一条商品;
  2. 查看 RabbitMQ 控制台,确认消息发送成功;
  3. 查看 ES 索引库,确认新商品数据已同步;
  4. 前台搜索新商品,可正常查询到结果。

六、RabbitMQ 可靠消息最终一致性介绍

6.1 如何保证消费者把消息成功消费掉?

也是为什么需要可靠消息最终一致性?

分布式系统中,服务间异步调用(基于 MQ)时,分布式事务是核心问题:

  • 需求:多个服务的业务逻辑,要么全部成功,要么全部失败;
  • 痛点:网络不稳定、服务宕机、消息丢失 / 重复,导致数据不一致;
  • 解决方案:可靠消息最终一致性(主流、易实现、适合异步场景)。

6.2 什么是可靠消息最终一致性

定义:事务发起方(生产者)执行本地事务成功后,发送消息到 MQ;事务参与方(消费者)一定能接收并成功处理消息,最终所有服务数据达到一致。

核心流程

  1. 生产者:本地事务执行成功 → 发送消息到 MQ;
  2. MQ:保证消息持久化、不丢失;
  3. 消费者:接收消息 → 处理业务 → 手动确认;
  4. 最终:生产者本地事务、消费者业务,全部成功

6.3 需解决的 3 大核心问题(条件)

问题 1:上游服务把消息成功发送
  • 风险:本地事务成功,消息发送失败(网络中断、MQ 宕机),导致数据不一致;
  • 目标:本地事务和消息发送,要么都成功,要么都失败
问题 2:下游服务把消息成功消费
  • 风险:MQ 投递消息后,消费者宕机,消息丢失;
  • 目标:消费者必须成功接收并处理消息,MQ 才删除消息
问题 3:下游服务对消息做幂等
  • 风险:消费者处理成功,但 ACK 超时,MQ 重复投递消息,导致业务重复执行;
  • 目标:消息无论投递多少次,业务只执行一次(幂等性)。

6.4 三大问题解决方案

方案 1:本地消息表(解决上游服务发送)
  • 核心:生产者新增本地消息表,执行本地事务时,同时写入消息记录(状态:发送中);
  • 定时任务:定时(Quartz)扫描本地消息表,将未发送成功的消息重新发送
  • 回调更新:收到 MQ 确认后,ConfirmCallback响应返回,更新消息状态为 “发送成功”。

方案 2:消息持久化 + 手动 ACK(解决消费者可靠接收)
  • 消息持久化:MQ 交换机、队列、消息均开启持久化,宕机后消息不丢失;
  • 手动 ACK:消费者处理完业务后,手动发送 ACK,MQ 收到 ACK 后才删除消息;处理失败则不 ACK,MQ 重新投递。

方案 3:消息去重表(解决幂等)
  • 核心:消费者新增消息去重表,记录已处理的消息唯一标识(如事务号);
  • 处理逻辑:接收消息后,先查询去重表,已存在则直接 ACK,不处理;不存在则处理业务,再记录去重表


七、RabbitMQ 实现可靠消息最终一致性(代码实战)

7.1 需求说明

  • 生产者(power_shop_item):保存商品、写入本地消息表、定时重发消息、更新消息状态;
  • 消费者(power_shop_search):监听消息、同步索引库、消息幂等校验、手动 ACK。

7.2 数据库准备

(1)本地消息表(local_message)

mysql

DROP TABLE IF EXISTS `local_message`;
CREATE TABLE `local_message` (
  `tx_no` varchar(255) NOT NULL COMMENT '事务号(唯一)',
  `item_id` bigint DEFAULT NULL COMMENT '商品ID',
  `state` int(11) DEFAULT NULL COMMENT '状态:0=发送中,1=发送成功',
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
(2)消息去重表(msg_distinct)

mysql

DROP TABLE IF EXISTS `msg_distinct`;
CREATE TABLE `msg_distinct` (
  `item_id` bigint NOT NULL,
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`item_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

表实体均用Mybatis-Plus生成放到 common_pojo 模块中,本地消息表控制层等用在item模块,消息去重表控制层等在search模块

import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.OutputFile;
import com.baomidou.mybatisplus.generator.config.rules.DbColumnType;
import com.baomidou.mybatisplus.generator.engine.FreemarkerTemplateEngine;
import org.apache.ibatis.type.JdbcType;

import java.nio.file.Paths;
import java.util.Collections;

public class CodeGenerator {
    public static void main(String[] args) {
        FastAutoGenerator.create("jdbc:mysql://localhost:3306/power_shop?characterEncoding=UTF-8", "root", "1111")
                .globalConfig(builder -> builder
                        .author("kskbl")
                        .outputDir(Paths.get(System.getProperty("user.dir")) + "/src/main/java")
                        .commentDate("yyyy-MM-dd")
                )
                .packageConfig(builder -> builder
                        .parent("com.powershop")
                        .entity("pojo")
                        .mapper("mapper")
                        .service("service")
                        .serviceImpl("service.impl")
                        .xml("mapper")
                        //mapper映射文件的位置
                        .pathInfo(Collections.singletonMap(OutputFile.xml, Paths.get(System.getProperty("user.dir")) + "/src/main/resources/mapper")) // 设置路径配置信息
                )
                .strategyConfig(builder -> builder
                        .addInclude("msg_distinct") // 设置需要生成的表名
                        .addTablePrefix("tb_") // 设置过滤表前缀
                        .serviceBuilder().formatServiceFileName("%sService") //接口名去掉“I”
                        .entityBuilder()
                ).dataSourceConfig(builder ->
                        builder.typeConvertHandler((globalConfig, typeRegistry, metaInfo) -> {
                            // 日期类型
                            if (JdbcType.DATE == metaInfo.getJdbcType()) {
                                return DbColumnType.DATE;
                            }if (JdbcType.TIMESTAMP == metaInfo.getJdbcType()) {
                                return DbColumnType.DATE;
                            }
                            return typeRegistry.getColumnType(metaInfo);
                        })
                )
                .templateEngine(new FreemarkerTemplateEngine())
                .execute();
    }
}

7.3 生产者:power_shop_item 改造

7.3.1 pom.xml 新增定时任务依赖

xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
7.3.2 application.yml 开启 MQ 确认回调

yaml

spring:
  rabbitmq:
    host: 192.168.204.134
    username: admin
    password: 1111
    virtual-host: /
    publisher-returns: true # 开启消息退回回调
    publisher-confirm-type: correlated # 开启消息确认回调
7.3.3 MQ 发送工具类(MQSender)

java

package com.powershop.mq;
import com.powershop.mapper.LocalMessageMapper;
import com.powershop.pojo.LocalMessage;
import com.powershop.utils.JsonUtils;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class ItemMQSender implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback{
    @Autowired
    private LocalMessageMapper localMessageMapper;
    @Autowired
    private AmqpTemplate amqpTemplate;

    // 发送消息到MQ
    public void sendMsg(LocalMessage localMessage) {
        RabbitTemplate rabbitTemplate = (RabbitTemplate) this.amqpTemplate;
        rabbitTemplate.setConfirmCallback(this); // 确认回调
        rabbitTemplate.setReturnCallback(this); // 失败回退回调

        // 绑定事务号,用于回调更新状态
        CorrelationData correlationData = new CorrelationData(localMessage.getTxNo());
        // 发送消息(交换机:index_exchange,路由键:item.add)
        rabbitTemplate.convertAndSend("index_exchange","item.add",
                JsonUtils.objectToJson(localMessage),correlationData);
    }

    // 消息发送失败回调
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                 String exchange, String routingKey) {
        System.out.println("消息发送失败:" + new String(message.getBody())
                + ",交换机:" + exchange + ",路由键:" + routingKey);
    }

    // 消息发送成功回调
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            // 发送成功,更新本地消息状态为1
            String txNo = correlationData.getId();
            LocalMessage localMessage = new LocalMessage();
            localMessage.setTxNo(txNo);
            localMessage.setState(1);
            localMessageMapper.updateByPrimaryKeySelective(localMessage);
        }
    }
}
7.3.4 Service 新增商品并写入本地消息表

java

@Service
@Transactional
public class TbItemServiceImpl implements TbItemService{
    @Autowired
    private TbItemMapper tbItemMapper;
    @Autowired
    private TbItemDescMapper tbItemDescMapper;
    @Autowired
    private TbItemParamItemMapper tbItemParamItemMapper;
    @Autowired
    private LocalMessageMapper localMessageMapper;

    @Override
    public void insertTbItem(TbItem tbItem, String desc, String itemParams) {
        // 1. 生成商品ID,保存商品、描述、规格(省略代码,同5.2.1)
        Long itemId = IDUtils.genItemId();
        // ... 省略商品基础数据保存代码 ...

        // 2. 写入本地消息表(状态:0=发送中)
        LocalMessage localMessage = new LocalMessage();
        localMessage.setTxNo(UUID.randomUUID().toString()); // 生成唯一事务号
        localMessage.setItemId(itemId);
        localMessage.setState(0);
        localMessageMapper.insertSelective(localMessage);
    }
}
7.3.5 定时任务:扫描本地消息表重发
  • 定时任务类

java

package com.powershop.job;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.powershop.mapper.LocalMessageMapper;
import com.powershop.mq.ItemMQSender;
import com.powershop.pojo.LocalMessage;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ItemQuartz {
    @Autowired
    private LocalMessageMapper localMessageMapper;
    @Autowired
    private ItemMQSender itemMQSender;
    public void scanLocalMessage(){
        //1、扫描状态是0的消息
        QueryWrapper<LocalMessage> wrapper = new QueryWrapper<>();
        wrapper.eq("state",0);
        List<LocalMessage> localMessageList = localMessageMapper.selectList(wrapper);
        for (LocalMessage localMessage : localMessageList) {
            //2、发送消息
            itemMQSender.sendMsg(localMessage);
        }
    }
}
  • 定时任务配置

java

package com.powershop.config;
import com.powershop.quartz.ItemQuartz;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

@Configuration
public class QuartzConfig {
    // 1. 定义任务:执行scanLocalMessage方法
    @Bean
    public MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean(ItemQuartz itemQuartz) {
        MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
        jobDetail.setTargetObject(itemQuartz);
        jobDetail.setTargetMethod("scanLocalMessage");
        return jobDetail;
    }

    // 2. 定义触发规则:每秒执行一次
    @Bean
    public CronTriggerFactoryBean cronTriggerFactoryBean(MethodInvokingJobDetailFactoryBean jobDetail) {
        CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
        trigger.setCronExpression("*/1 * * * * ?"); // 每秒
        trigger.setJobDetail(jobDetail.getObject());
        return trigger;
    }

    // 3. 注册调度器
    @Bean
    public SchedulerFactoryBean schedulerFactoryBean(CronTriggerFactoryBean trigger) {
        SchedulerFactoryBean scheduler = new SchedulerFactoryBean();
        scheduler.setTriggers(trigger.getObject());
        return scheduler;
    }
}

7.4 消费者:power_shop_search 改造

7.4.1 application.yml 开启手动 ACK

pom.xml 

<dependency>
       <groupId>mysql</groupId>
       <artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
       <groupId>com.alibaba</groupId>
       <artifactId>druid</artifactId>
</dependency>
<dependency>
       <groupId>com.prowershop</groupId>
       <artifactId>common_pojo</artifactId>
       <version>1.0-SNAPSHOT</version>
</dependency>

yaml

spring:
  rabbitmq:
    host: 192.168.204.134
    username: admin
    password: 1111
    virtual-host: /
    listener:
      direct:
        acknowledge-mode: manual # 手动确认
      simple:
        acknowledge-mode: manual # 手动确认
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/power_shop?characterEncoding=UTF-8
    username: root
    password: 1111
    type: com.alibaba.druid.pool.DruidDataSource

启动器添加

@MapperScan("com.powershop.mapper")
7.4.2 Service 新增消息去重逻辑
  • 去重 Service

SearchItemServiceImpl

@Service
public class SearchItemServiceImpl implements SearchItemService {
    ...
    @Autowired
    private MsgDistinctMapper msgDistinctMapper;
    ...

    @Override
    public void addItem(Long itemId) throws IOException {
        //消息查询
        MsgDistinct msgDistinct = msgDistinctMapper.selectById(itemId);
        if (msgDistinct == null) {
            //1、查询MySQL
            Map map = itemServiceFeign.selectSearchItemByItemId(itemId);
            //2、同步到索引库
            IndexRequest request = new IndexRequest();
            request.index("power_shop");
            request.type("item");
            request.source(JsonUtils.objectToJson(map), XContentType.JSON);
            restHighLevelClient.index(request);
            //消息记录
            msgDistinct = new MsgDistinct();
            msgDistinct.setItemId(itemId);
            msgDistinct.setCreateTime(new Date());
            msgDistinctMapper.insert(msgDistinct);
        }else {
            System.out.println("=======幂等生效:事务"+msgDistinct.getItemId()
                    +" 已成功执行===========");
        }
    }
}
7.4.3 Listener 监听消息 + 幂等校验 + 手动 ACK

java

package com.powershop.mq;
import com.powershop.service.SearchItemService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SearchMQRecv {
    @Autowired
    private SearchItemService searchItemService;

    //接受消息三要素  exchange   queue   routingKey
    @RabbitListener(bindings ={ @QueueBinding(
            value = @Queue(value = "item_queue"),
            exchange = @Exchange(value = "item_exchange",type = ExchangeTypes.TOPIC),
            key = {"item.add"}
    )})
    public void ListenerMsg(Long itemId, Channel channel , Message message){
        try {
            searchItemService.addItem(itemId);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

7.5 测试验证

  1. 消息重发测试:新增商品,MySQL将本地消息表(local_message)状态改为 0;定时任务自动重发,状态更新为 1,索引库同步成功。
  2. 幂等测试:关闭手动 ACK,重复投递消息;开启 ACK 后,仅第一次执行同步,后续直接 ACK,无重复数据。
  3. 异常测试:消费者处理时宕机,重启后 MQ 重新投递,正常处理,数据最终一致。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐