RabbitMQ 核心详解
一、RabbitMQ 核心知识点
1.1 什么是 MQ
MQ(Message Queue,消息队列),消息中间件,是存储消息的容器,用于分布式系统之间进行通信,实现系统解耦、异步通信、流量削峰等核心能力。

1.2 主流 MQ 对比选型
| 特性 | RabbitMQ | ActiveMQ | RocketMQ | Kafka |
|---|---|---|---|---|
| 公司 / 社区 | Rabbit | Apache | 阿里 | Apache |
| 开发语言 | Erlang | Java | Java | Scala&Java |
| 协议 | AMQP | OpenWire、AUTO、Stomp、MQTT | 自定义 | 自定义 |
| 单机吞吐量 | 万级 | 万级 (最差) | 十万级 | 十万级 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
| 特性 | 并发能力强,延时低 | 老牌产品,文档多 | 功能完备,扩展性佳 | 主打大数据日志采集,仅支持核心 MQ 功能 |
选型建议:
- 中小型公司:首选 RabbitMQ(高并发、社区活跃、功能完备);
- 大型公司:根据场景选择 RocketMQ(业务消息)或 Kafka(日志采集)。
1.3 RabbitMQ 基础
RabbitMQ 是采用 Erlang 语言开发实现 AMQP(高级消息队列)协议的消息中间件。
RabbitMQ的实现类:
connection:连接,京港澳高速路
channel:信道,行车道之一,封装了大部分的API
queue:存储消息的容器
exchange:分发消息
AMQP 协议组成

AMQP 协议定义了消息传递的规范,核心包含:
- 生产者:发送消息的应用程序;
- 交换机(Exchange):路由消息到队列;
- 队列(Queue):存储消息的缓冲区;
- 消费者:接收并处理消息的应用程序;
- 绑定(Binding):交换机与队列的关联规则。
RabbitMQ的启动器
spring-boot-starter-amqp
1.4 为什么使用MQ(RabbitMQ核心价值)
(1)解耦
传统模式:系统 A 直接调用系统 B/C 代码,新增系统 D 需修改 A 的代码;

MQ 模式:系统 A 将消息写入 MQ,其他系统从 MQ 订阅消息,A 无需修改代码。

(2)异步
传统模式:非核心业务同步执行,响应慢;

MQ 模式:非核心业务通过 MQ 异步执行,提升响应速度。

A服务把消息发送到MQ,B、C服务接受消息后以异步方式运行
(3)削峰
传统模式:高并发请求直接冲击数据库,导致连接异常;

MQ 模式:请求先写入 MQ,系统按数据库处理能力从 MQ 慢慢拉取消息,平稳消费。

1.5 RabbitMQ 安装(CentOS)
1.5.1 克隆 CentOS 虚拟机(可选)
通过 VMware 克隆 CentOS 虚拟机,作为 RabbitMQ 部署节点。
1.5.2 安装 Erlang(RabbitMQ 依赖)
上传安装包


bash
# 依次执行
rpm -ivh esl-erlang-17.3-1.x86_64.rpm --force --nodeps
rpm -ivh esl-erlang_17.3-1~centos~6_amd64.rpm --force --nodeps
rpm -ivh esl-erlang-compat-R14B-1.el6.noarch.rpm --force --nodeps
1.5.3 安装 RabbitMQ
bash
# 安装包
rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm
# 启动/停止/重启/状态
service rabbitmq-server start
service rabbitmq-server stop
service rabbitmq-server restart
service rabbitmq-server status
# 开机自启
chkconfig rabbitmq-server on
# 防火墙开放15672端口(管理界面)
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT
/etc/rc.d/init.d/iptables save
# 开启web管理工具
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
# 创建管理员账号
rabbitmqctl add_user admin 1111
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 查看用户
rabbitmqctl list_users
1.5.4 访问管理界面
浏览器访问:http://服务器IP:15672,使用 admin/1111 登录。

1.6 RabbitMQ 工程搭建
1.6.1 创建工程
新建 SpringBoot 工程springboot_rabbitmq。
1.6.2 pom.xml 依赖
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.2.RELEASE</version>
</parent>
<groupId>com.powershop</groupId>
<artifactId>springboot_rabbitmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
1.6.3 连接工具类
java
package com.powershop.util;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.233.132");
//端口
factory.setPort(5672);
//设置账号信息
factory.setUsername("admin");
factory.setPassword("1111");
factory.setVirtualHost("/");
// 获取连接
Connection connection = factory.newConnection();
return connection;
}
}
二、RabbitMQ 五种消息模型
RabbitMQ 提供 5 种核心消息模型(排除 RPC),其中 3/4/5 属于订阅模型,仅路由方式不同。
1 Simple - 简单模型
核心角色:
- P(生产者):发送消息;
- C(消费者):接收消息;
- 队列:存储消息的缓冲区。
一对一:一个生产者对一个消费者
(1) 生产者发送消息
java
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQSender {
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取连接 Connection:京港澳高速路
Connection connection = ConnectionUtil.getConnection();
//2、创建信道 Channel:行车道之一,封装了大部分API
Channel channel = connection.createChannel();
//3、创建队列 Queue
String QUEUE_NAME = "simple_queue";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4、发送消息
String msg = "kskbl";
/**
* String queue : 队列名
* boolean autoAck: 是否自动Ack,MQ的消息复制到消费者,消费者接受后,Ack返回到MQ才会删除消息
* Consumer callback:返回后的业务逻辑
*/
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("sendMSG:" + msg);
//5、关闭
//channel.close();
//connection.close();
}
}
(2) 消费者接收消息
java
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取连接 Connection:京港澳高速路
Connection connection = ConnectionUtil.getConnection();
//2、创建信道 Channel:行车道之一,封装了大部分API
Channel channel = connection.createChannel();
//3、创建队列 Queue,若Queue已存在,则不创建
String QUEUE_NAME = "simple_queue";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4、接受消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("consumer:" + msg);
}
};
/**
* String queue : 队列名
* boolean autoAck: 是否自动Ack,MQ的消息复制到消费者,消费者接受后,Ack返回到MQ才会删除消息
* Consumer callback:返回后的业务逻辑
*/
channel.basicConsume(QUEUE_NAME,true,consumer);
//5、关闭
channel.close();
connection.close();
}
}
说明:消费者启动后会持续监听队列,有新消息立即消费,消费后消息从队列移除。
(3) 手动ACK
-
自动ACK:消息一旦被接收,消费者自动发送ACK
-
手动ACK:消息接收后,不会发送ACK,需要手动调用
自动ACK问题:如果消费者领取消息后,还没执行操作就程序报错?
消息消费失败,但是消息依然被消费,RabbitMQ无从得知,这样消息就丢失了!
生产者不做任何修改,直接运行,消息发送成功:

消费者,添加异常,运行消费者,程序抛出异常。但是消息依然被消费:

我们之前的测试都是默认自动ACK的,如果要手动ACK,需要改动我们的代码:
channel.basicConsume(, false, ); //默认自动ACK,false取消默认
channel.basicAck(envelope.getDeliveryTag(), false); // true改为false,开启手动ACK
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1、获取连接 Connection:京港澳高速路
Connection connection = ConnectionUtil.getConnection();
//2、创建信道 Channel:行车道之一,封装了大部分API
Channel channel = connection.createChannel();
//3、创建队列 Queue,若Queue已存在,则不创建
String QUEUE_NAME = "simple_queue";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4、接受消息
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer:" + msg);
//模拟业务错误
//int a = 6/0;
//手动 ACK
/**
* long deliveryTag 在消息队列中的位置下标
* boolean multiple 是否批量处理多个消息,如果为true,比下标小的一块儿处理
*/
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
/**
* String queue : 队列名
* boolean autoAck: 是否 自动Ack,MQ的消息复制到消费者,消费者接受后,Ack返回到MQ才会删除消息
* Consumer callback:返回后的业务逻辑
*/
channel.basicConsume(QUEUE_NAME,false,consumer);
//5、关闭
//channel.close();
//connection.close();
}
}
生产者不变,再次运行:

2 Work-消息模型
能者多劳型,特点:多个消费者

(1) 生产者与案例1相似
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQSender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "simple_queue";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0; i < 50; i++) {
String msg = "kskbl" + i;
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("sendMSG:" + i + " "+ msg);
}
//channel.close();
//connection.close();
}
}
(2) 消费者1
模拟消费者1快,消费者2比较慢的情况
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "simple_queue";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer1:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
(3) 消费者2
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "simple_queue";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer2:" + msg);
//模拟业务耗时
Thread.sleep(1000);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
两个消费者一同启动,然后消费50条消息
(4) Simple简单模型 问题:
两个消费者各自消费了25条消息,但是消费者1明显快于消费者2,任务的总耗时较长(如果将消息都给消费者1,总耗时大大缩短)
(5) Work 消息模型:

消费者接受消息前添加:
channel.basicQos(1); //一个消息只能被一个消费者获取,每个消费者同时只能处理一条消息
测试:

问题:
如何防止消息堆积? 多个consumer+能者多劳
3 Fanout消息模型
Fanout,也称为广播。特点:加入了交换机Exchange

-
每个消费者有自己的queue(队列)
-
每个队列都要绑定到Exchange(交换机)
-
生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
(1) 生产者
创建交换机Exchange,不再声明Queue
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQSender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String EXCHANGE_NAME = "FANOUT_EXCHANGE";
//创建交换机Exchange:分发消息
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
String msg = "kskbl";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("sendMSG:"+ msg);
//channel.close();
//connection.close();
}
}
(2) 消费者1
队列Queue需要和交换机绑定
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "fanout_queue_1";
String EXCHANGE_NAME = "FANOUT_EXCHANGE";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer1:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
channel.basicQos(1);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
(3) 消费者2
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MQConsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "fanout_queue_2";
String EXCHANGE_NAME = "FANOUT_EXCHANGE";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定队列交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer2:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
channel.basicQos(1);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
注意:Exchange只负责分发消息,若没有Queue绑定到Exchange上则消息会被丢弃
效果:


4 Direct消息模型

定向消息匹配
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
(1) 生产者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectSender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String EXCHANGE_NAME = "DIRECT_EXCHANGE";
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
String msg = "KsKbl-D";
// 发送消息,并且指定routing key 为:error
channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes());
System.out.println("sendMSG:"+ msg);
channel.close();
connection.close();
}
}
(2) 消费者1
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "direct_queue_1";
String EXCHANGE_NAME = "DIRECT_EXCHANGE";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机,同时指定需要订阅的routing key。
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer1:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
(3) 消费者2
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DirectConsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "direct_queue_2";
String EXCHANGE_NAME = "DIRECT_EXCHANGE";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机,同时指定需要订阅的routing key。
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer2:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
效果:

消费者1只能接受error消息,消费者2能接受error、info、warning消息
5 Topic消息模型
也叫话题消息模型,模糊匹配
Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
* :匹配1个单词
# :匹配n个单词
举例:
audit.# :能够匹配audit.irs.corporate 或者 audit.irs audit.* :只能匹配audit.irs
(1) 生产者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicSender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String EXCHANGE_NAME = "TOPIC_EXCHANGE";
// 声明exchange,指定类型为TOPIC
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);
String msg = "KsKbl-";
// 发送消息,并且指定routing key
channel.basicPublish(EXCHANGE_NAME,"item.insert",null,msg.getBytes());
System.out.println("sendMSG:"+ msg);
channel.close();
connection.close();
}
}
(2) 消费者1
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TopicConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "topic_queue_1";
String EXCHANGE_NAME = "TOPIC_EXCHANGE";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机,同时指定需要订阅的routing key。
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"user.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer1:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
(3) 消费者2
public class TopicConsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "topic_queue_2";
String EXCHANGE_NAME = "TOPIC_EXCHANGE";
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 绑定队列到交换机,同时指定需要订阅的routing key。
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer2:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}

效果:
消费者1接受 user.* 消息;消费者2接受 item.* 消息
三、持久化
3.1 为什么要持久化
如何避免消息丢失,保证消费者成功消费?(理论上)
- 消费者的手动ACK机制,可以防止业务处理失败。
- 但是,如果在消费者消费之前,MQ就宕机了,消息就没了,需要持久化。
3.2 持久化步骤
队列Queue、Exchange、消息 都需要持久化
1、Exchange 持久化
channel.exchangeDeclare(, , true);
2、Queue 持久化
channel.queueDeclare(, true, , , );
3、消息 持久化
channel.basicPublish(, , MessageProperties.PERSISTENT_TEXT_PLAIN,);
(1) 生产者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import org.springframework.amqp.core.ExchangeTypes;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableSender {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String EXCHANGE_NAME = "DURABLE_EXCHANGE";
for (int i = 0; i < 50; i++) {
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC,true); //true则开启Exchange持久化
String msg = "kskbl-" + i;
channel.basicPublish(EXCHANGE_NAME,"item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes()); //开启消息持久化
System.out.println("sendMSG:"+ msg);
}
channel.close();
connection.close();
}
}
(2) 消费者
import com.powershop.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DurableConsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "durable_queue_1";
String EXCHANGE_NAME = "DURABLE_EXCHANGE";
channel.queueDeclare(QUEUE_NAME,true,false,false,null); //true则开启Queue队列持久化
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"item.*");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
String msg = new String(body);
System.out.println("consumer1:" + msg);
channel.basicAck(envelope.getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,false,consumer);
//channel.close();
//connection.close();
}
}
效果:


D 表示已经持久化
四、SpringBoot整合RabbitMQ
4.1 依赖和配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: 192.168.61.137
port: 5672
username: admin
password: 1111
virtual-host: /
4.2 消费/监听者
@Component
public class Recver {
/**
* 接收消息的三个要素:
* 1、exchange
* 2、queue
* 3、routingKey
*/
@RabbitListener(bindings = {@QueueBinding(
value = @Queue(value = "springboot_queue"),
exchange = @Exchange(value = "springboot_exchange", type = ExchangeTypes.TOPIC),
key = {"springboot.*"}
)})
public void listenerMsg(String msg) {
System.out.println("Recver:" + msg);
}
}
-
@Componet:类上的注解,注册到Spring容器 -
@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:-
bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:-
value:这个消费者关联的队列。值是@Queue,代表一个队列 -
exchange:队列所绑定的交换机,值是@Exchange类型 -
key:队列和交换机绑定的RoutingKey
-
-
类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
4.3 发送者
import com.powershop.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = {Application.class})
public class sender {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSendMsg() {
String msg = "世界 MC!";
amqpTemplate.convertAndSend("springboot_exchange","springboot.test",msg);
System.out.println("Sender:" + msg);
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.4 启动器
package com.powershop;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
4.5 设置手动ack
application.yml
#设置三种订阅模式手动ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
#设置work消息类型手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
注意:监听方法添加Channel channel, Message message两个参数
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Recv {
@RabbitListener(bindings = {@QueueBinding(
value = @Queue(value = "springboot_queue"),
exchange = @Exchange(value = "springboot_exchange", type = ExchangeTypes.TOPIC),
key = {"springboot.*"}
)})
public void MQListener(String msg, Channel channel, Message message){
/**
* 监听者接受消息三要素:
* 1、Exchange
* 2、Queue
* 3、Routing Key
*/
try {
//int a = 6/0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("接收到消息:" + msg);
}catch (Exception e){
e.printStackTrace();
}
}
}
运行效果:

五、索引库同步
需求:后台新增商品后,需要将商品数据实时同步到 Elasticsearch 索引库,否则新增的商品ES查询不到,下面对比三种实现方案:
5.1 方案对比
方案一:同服务同步(强耦合)
- 逻辑:在
power_shop_item新增商品的业务中,直接添加同步索引库的代码。 - 缺点:业务耦合度高,商品维护和索引维护绑定在一起,扩展性差。
方案二:服务间调用(紧耦合)
- 逻辑:
power_shop_item新增商品后,直接调用power_shop_search的同步接口。 - 缺点:系统耦合性强,新增其他同步服务时,需修改
power_shop_item代码。
方案三:RabbitMQ 异步同步(解耦)
- 逻辑:
power_shop_item新增商品后,发送消息到 RabbitMQ;power_shop_search监听消息,异步完成索引同步。 - 优点:完全解耦、异步执行、不阻塞主业务、扩展性强。
5.2 索引库同步实现(RabbitMQ 版)
5.2.1 生产者:power_shop_item(发送消息)
(1)pom.xml 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)application.yml 配置 RabbitMQ
yaml
spring:
rabbitmq:
host: 192.168.204.134
username: admin
password: 1111
virtual-host: /
(3)Service 新增商品并发送消息
java
@Service
public class ItemServiceImpl extends ServiceImpl<ItemMapper, Item> implements ItemService {
@Autowired
private AmqpTemplate amqpTemplate;
...
@Override
public void insertTbItem(TbItem tbItem, String desc, String itemParams) {
// 1. 生成商品ID,补齐商品基础数据
Long itemId = IDUtils.genItemId();
Date date = new Date();
tbItem.setId(itemId);
tbItem.setStatus((byte)1);
tbItem.setUpdated(date);
tbItem.setCreated(date);
tbItemMapper.insertSelective(tbItem);
// 2. 保存商品描述
TbItemDesc tbItemDesc = new TbItemDesc();
tbItemDesc.setItemId(itemId);
tbItemDesc.setItemDesc(desc);
tbItemDesc.setCreated(date);
tbItemDesc.setUpdated(date);
tbItemDescMapper.insertSelective(tbItemDesc);
// 3. 保存商品规格参数
TbItemParamItem tbItemParamItem = new TbItemParamItem();
tbItemParamItem.setItemId(itemId);
tbItemParamItem.setParamData(itemParams);
tbItemParamItem.setUpdated(date);
tbItemParamItem.setCreated(date);
tbItemParamItemMapper.insertSelective(tbItemParamItem);
// 4. 发送商品新增消息到RabbitMQ(交换机:item_exchage,路由键:item.add)
amqpTemplate.convertAndSend("item_exchage","item.add", itemId);
}
@Override
public Map selectSearchItemByItemId(Long itemId) {
return itemMapper.selectSearchItemByItemId(itemId);
}
}
(4)ItemController
...
@RequestMapping("/selectSearchItemByItemId")
public Map selectSearchItemByItemId(Long itemId){
return itemService.selectSearchItemByItemId(itemId);
};
(5)ItemMaper 按 ID 查询商品完整信息
...
<select id="selectSearchItemByItemId" resultType="java.util.Map">
SELECT
t1.id, t1.title item_title,
t1.sell_point item_sell_point,
t1.price item_price,
t1.image item_image,
t2.name item_category_name,
t3.item_desc item_desc
FROM tb_item t1, tb_item_cat t2, tb_item_desc t3
WHERE t1.cid=t2.id AND t1.id=t3.item_id AND t1.id = #{itemId}
</select>
5.2.2 消费者:power_shop_search(监听/接受消息同步索引)
(1)pom.xml 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)application.yml 配置 RabbitMQ
yaml
spring:
rabbitmq:
host: 192.168.204.134
username: admin
password: 1111
virtual-host: /
listener:
simple:
acknowledge-mode: manual #不自动ack
direct:
acknowledge-mode: manual
(3)Listener 监听 RabbitMQ 消息
java
package com.powershop.mq;
import com.powershop.service.SearchItemService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SearchMQRecv {
@Autowired
private SearchItemService searchItemService;
//接受消息三要素 exchange queue routingKey
@RabbitListener(bindings ={ @QueueBinding(
value = @Queue(value = "item_queue"),
exchange = @Exchange(value = "item_exchange",type = ExchangeTypes.TOPIC),
key = {"item.add"}
)})
public void ListenerMsg(Long itemId, Channel channel , Message message){
try {
searchItemService.addItem(itemId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
}
(4)Service 同步商品到 ES 索引库
- SearchItemService
public interface SearchItemService {
...
void addItem(Long itemId) throws IOException;
}
- SearchItemServiceImpl
@Service
public class SearchItemServiceImpl implements SearchItemService {
...
@Override
public void addItem(Long itemId) throws IOException {
//1、查询MySQL
Map map = itemServiceFeign.selectSearchItemByItemId(itemId);
//2、同步到索引库
IndexRequest request = new IndexRequest();
request.index("power_shop");
request.type("item");
request.source(JsonUtils.objectToJson(map),XContentType.JSON);
restHighLevelClient.index(request);
}
}
5.3 功能测试
- 后台(power_shop_item)新增一条商品;
- 查看 RabbitMQ 控制台,确认消息发送成功;
- 查看 ES 索引库,确认新商品数据已同步;
- 前台搜索新商品,可正常查询到结果。
六、RabbitMQ 可靠消息最终一致性介绍
6.1 如何保证消费者把消息成功消费掉?
也是为什么需要可靠消息最终一致性?
分布式系统中,服务间异步调用(基于 MQ)时,分布式事务是核心问题:
- 需求:多个服务的业务逻辑,要么全部成功,要么全部失败;
- 痛点:网络不稳定、服务宕机、消息丢失 / 重复,导致数据不一致;
- 解决方案:可靠消息最终一致性(主流、易实现、适合异步场景)。
6.2 什么是可靠消息最终一致性
定义:事务发起方(生产者)执行本地事务成功后,发送消息到 MQ;事务参与方(消费者)一定能接收并成功处理消息,最终所有服务数据达到一致。

核心流程:
- 生产者:本地事务执行成功 → 发送消息到 MQ;
- MQ:保证消息持久化、不丢失;
- 消费者:接收消息 → 处理业务 → 手动确认;
- 最终:生产者本地事务、消费者业务,全部成功。
6.3 需解决的 3 大核心问题(条件)
问题 1:上游服务把消息成功发送
- 风险:本地事务成功,消息发送失败(网络中断、MQ 宕机),导致数据不一致;
- 目标:本地事务和消息发送,要么都成功,要么都失败。
问题 2:下游服务把消息成功消费
- 风险:MQ 投递消息后,消费者宕机,消息丢失;
- 目标:消费者必须成功接收并处理消息,MQ 才删除消息。
问题 3:下游服务对消息做幂等
- 风险:消费者处理成功,但 ACK 超时,MQ 重复投递消息,导致业务重复执行;
- 目标:消息无论投递多少次,业务只执行一次(幂等性)。
6.4 三大问题解决方案
方案 1:本地消息表(解决上游服务发送)
- 核心:生产者新增本地消息表,执行本地事务时,同时写入消息记录(状态:发送中);
- 定时任务:定时(Quartz)扫描本地消息表,将未发送成功的消息重新发送;
- 回调更新:收到 MQ 确认后,ConfirmCallback响应返回,更新消息状态为 “发送成功”。

方案 2:消息持久化 + 手动 ACK(解决消费者可靠接收)
- 消息持久化:MQ 交换机、队列、消息均开启持久化,宕机后消息不丢失;
- 手动 ACK:消费者处理完业务后,手动发送 ACK,MQ 收到 ACK 后才删除消息;处理失败则不 ACK,MQ 重新投递。

方案 3:消息去重表(解决幂等)
- 核心:消费者新增消息去重表,记录已处理的消息唯一标识(如事务号);
- 处理逻辑:接收消息后,先查询去重表,已存在则直接 ACK,不处理;不存在则处理业务,再记录去重表。

七、RabbitMQ 实现可靠消息最终一致性(代码实战)
7.1 需求说明
- 生产者(power_shop_item):保存商品、写入本地消息表、定时重发消息、更新消息状态;
- 消费者(power_shop_search):监听消息、同步索引库、消息幂等校验、手动 ACK。
7.2 数据库准备
(1)本地消息表(local_message)
mysql
DROP TABLE IF EXISTS `local_message`;
CREATE TABLE `local_message` (
`tx_no` varchar(255) NOT NULL COMMENT '事务号(唯一)',
`item_id` bigint DEFAULT NULL COMMENT '商品ID',
`state` int(11) DEFAULT NULL COMMENT '状态:0=发送中,1=发送成功',
PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
(2)消息去重表(msg_distinct)
mysql
DROP TABLE IF EXISTS `msg_distinct`;
CREATE TABLE `msg_distinct` (
`item_id` bigint NOT NULL,
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`item_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
表实体均用Mybatis-Plus生成放到 common_pojo 模块中,本地消息表控制层等用在item模块,消息去重表控制层等在search模块
import com.baomidou.mybatisplus.generator.FastAutoGenerator;
import com.baomidou.mybatisplus.generator.config.OutputFile;
import com.baomidou.mybatisplus.generator.config.rules.DbColumnType;
import com.baomidou.mybatisplus.generator.engine.FreemarkerTemplateEngine;
import org.apache.ibatis.type.JdbcType;
import java.nio.file.Paths;
import java.util.Collections;
public class CodeGenerator {
public static void main(String[] args) {
FastAutoGenerator.create("jdbc:mysql://localhost:3306/power_shop?characterEncoding=UTF-8", "root", "1111")
.globalConfig(builder -> builder
.author("kskbl")
.outputDir(Paths.get(System.getProperty("user.dir")) + "/src/main/java")
.commentDate("yyyy-MM-dd")
)
.packageConfig(builder -> builder
.parent("com.powershop")
.entity("pojo")
.mapper("mapper")
.service("service")
.serviceImpl("service.impl")
.xml("mapper")
//mapper映射文件的位置
.pathInfo(Collections.singletonMap(OutputFile.xml, Paths.get(System.getProperty("user.dir")) + "/src/main/resources/mapper")) // 设置路径配置信息
)
.strategyConfig(builder -> builder
.addInclude("msg_distinct") // 设置需要生成的表名
.addTablePrefix("tb_") // 设置过滤表前缀
.serviceBuilder().formatServiceFileName("%sService") //接口名去掉“I”
.entityBuilder()
).dataSourceConfig(builder ->
builder.typeConvertHandler((globalConfig, typeRegistry, metaInfo) -> {
// 日期类型
if (JdbcType.DATE == metaInfo.getJdbcType()) {
return DbColumnType.DATE;
}if (JdbcType.TIMESTAMP == metaInfo.getJdbcType()) {
return DbColumnType.DATE;
}
return typeRegistry.getColumnType(metaInfo);
})
)
.templateEngine(new FreemarkerTemplateEngine())
.execute();
}
}
7.3 生产者:power_shop_item 改造
7.3.1 pom.xml 新增定时任务依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
7.3.2 application.yml 开启 MQ 确认回调
yaml
spring:
rabbitmq:
host: 192.168.204.134
username: admin
password: 1111
virtual-host: /
publisher-returns: true # 开启消息退回回调
publisher-confirm-type: correlated # 开启消息确认回调
7.3.3 MQ 发送工具类(MQSender)
java
package com.powershop.mq;
import com.powershop.mapper.LocalMessageMapper;
import com.powershop.pojo.LocalMessage;
import com.powershop.utils.JsonUtils;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
public class ItemMQSender implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback{
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private AmqpTemplate amqpTemplate;
// 发送消息到MQ
public void sendMsg(LocalMessage localMessage) {
RabbitTemplate rabbitTemplate = (RabbitTemplate) this.amqpTemplate;
rabbitTemplate.setConfirmCallback(this); // 确认回调
rabbitTemplate.setReturnCallback(this); // 失败回退回调
// 绑定事务号,用于回调更新状态
CorrelationData correlationData = new CorrelationData(localMessage.getTxNo());
// 发送消息(交换机:index_exchange,路由键:item.add)
rabbitTemplate.convertAndSend("index_exchange","item.add",
JsonUtils.objectToJson(localMessage),correlationData);
}
// 消息发送失败回调
@Override
public void returnedMessage(Message message, int replyCode, String replyText,
String exchange, String routingKey) {
System.out.println("消息发送失败:" + new String(message.getBody())
+ ",交换机:" + exchange + ",路由键:" + routingKey);
}
// 消息发送成功回调
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
// 发送成功,更新本地消息状态为1
String txNo = correlationData.getId();
LocalMessage localMessage = new LocalMessage();
localMessage.setTxNo(txNo);
localMessage.setState(1);
localMessageMapper.updateByPrimaryKeySelective(localMessage);
}
}
}
7.3.4 Service 新增商品并写入本地消息表
java
@Service
@Transactional
public class TbItemServiceImpl implements TbItemService{
@Autowired
private TbItemMapper tbItemMapper;
@Autowired
private TbItemDescMapper tbItemDescMapper;
@Autowired
private TbItemParamItemMapper tbItemParamItemMapper;
@Autowired
private LocalMessageMapper localMessageMapper;
@Override
public void insertTbItem(TbItem tbItem, String desc, String itemParams) {
// 1. 生成商品ID,保存商品、描述、规格(省略代码,同5.2.1)
Long itemId = IDUtils.genItemId();
// ... 省略商品基础数据保存代码 ...
// 2. 写入本地消息表(状态:0=发送中)
LocalMessage localMessage = new LocalMessage();
localMessage.setTxNo(UUID.randomUUID().toString()); // 生成唯一事务号
localMessage.setItemId(itemId);
localMessage.setState(0);
localMessageMapper.insertSelective(localMessage);
}
}
7.3.5 定时任务:扫描本地消息表重发
- 定时任务类
java
package com.powershop.job;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.powershop.mapper.LocalMessageMapper;
import com.powershop.mq.ItemMQSender;
import com.powershop.pojo.LocalMessage;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class ItemQuartz {
@Autowired
private LocalMessageMapper localMessageMapper;
@Autowired
private ItemMQSender itemMQSender;
public void scanLocalMessage(){
//1、扫描状态是0的消息
QueryWrapper<LocalMessage> wrapper = new QueryWrapper<>();
wrapper.eq("state",0);
List<LocalMessage> localMessageList = localMessageMapper.selectList(wrapper);
for (LocalMessage localMessage : localMessageList) {
//2、发送消息
itemMQSender.sendMsg(localMessage);
}
}
}
- 定时任务配置
java
package com.powershop.config;
import com.powershop.quartz.ItemQuartz;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
@Configuration
public class QuartzConfig {
// 1. 定义任务:执行scanLocalMessage方法
@Bean
public MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean(ItemQuartz itemQuartz) {
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
jobDetail.setTargetObject(itemQuartz);
jobDetail.setTargetMethod("scanLocalMessage");
return jobDetail;
}
// 2. 定义触发规则:每秒执行一次
@Bean
public CronTriggerFactoryBean cronTriggerFactoryBean(MethodInvokingJobDetailFactoryBean jobDetail) {
CronTriggerFactoryBean trigger = new CronTriggerFactoryBean();
trigger.setCronExpression("*/1 * * * * ?"); // 每秒
trigger.setJobDetail(jobDetail.getObject());
return trigger;
}
// 3. 注册调度器
@Bean
public SchedulerFactoryBean schedulerFactoryBean(CronTriggerFactoryBean trigger) {
SchedulerFactoryBean scheduler = new SchedulerFactoryBean();
scheduler.setTriggers(trigger.getObject());
return scheduler;
}
}
7.4 消费者:power_shop_search 改造
7.4.1 application.yml 开启手动 ACK
pom.xml
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
<dependency>
<groupId>com.prowershop</groupId>
<artifactId>common_pojo</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
yaml
spring:
rabbitmq:
host: 192.168.204.134
username: admin
password: 1111
virtual-host: /
listener:
direct:
acknowledge-mode: manual # 手动确认
simple:
acknowledge-mode: manual # 手动确认
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/power_shop?characterEncoding=UTF-8
username: root
password: 1111
type: com.alibaba.druid.pool.DruidDataSource
启动器添加
@MapperScan("com.powershop.mapper")
7.4.2 Service 新增消息去重逻辑
- 去重 Service
SearchItemServiceImpl
@Service
public class SearchItemServiceImpl implements SearchItemService {
...
@Autowired
private MsgDistinctMapper msgDistinctMapper;
...
@Override
public void addItem(Long itemId) throws IOException {
//消息查询
MsgDistinct msgDistinct = msgDistinctMapper.selectById(itemId);
if (msgDistinct == null) {
//1、查询MySQL
Map map = itemServiceFeign.selectSearchItemByItemId(itemId);
//2、同步到索引库
IndexRequest request = new IndexRequest();
request.index("power_shop");
request.type("item");
request.source(JsonUtils.objectToJson(map), XContentType.JSON);
restHighLevelClient.index(request);
//消息记录
msgDistinct = new MsgDistinct();
msgDistinct.setItemId(itemId);
msgDistinct.setCreateTime(new Date());
msgDistinctMapper.insert(msgDistinct);
}else {
System.out.println("=======幂等生效:事务"+msgDistinct.getItemId()
+" 已成功执行===========");
}
}
}
7.4.3 Listener 监听消息 + 幂等校验 + 手动 ACK
java
package com.powershop.mq;
import com.powershop.service.SearchItemService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class SearchMQRecv {
@Autowired
private SearchItemService searchItemService;
//接受消息三要素 exchange queue routingKey
@RabbitListener(bindings ={ @QueueBinding(
value = @Queue(value = "item_queue"),
exchange = @Exchange(value = "item_exchange",type = ExchangeTypes.TOPIC),
key = {"item.add"}
)})
public void ListenerMsg(Long itemId, Channel channel , Message message){
try {
searchItemService.addItem(itemId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
e.printStackTrace();
}
}
}
7.5 测试验证
- 消息重发测试:新增商品,MySQL将本地消息表(local_message)状态改为 0;定时任务自动重发,状态更新为 1,索引库同步成功。
- 幂等测试:关闭手动 ACK,重复投递消息;开启 ACK 后,仅第一次执行同步,后续直接 ACK,无重复数据。
- 异常测试:消费者处理时宕机,重启后 MQ 重新投递,正常处理,数据最终一致。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)