一. 交换机

1.1 Exchanges

1.1.1 Exchanges概念

​ RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。

​ 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

在这里插入图片描述

1.1.2 Exchanges的类型

Exchanges总共有以下类型

  1. 直接(direct)
  2. 主题(topic)
  3. 标题(header)
  4. 扇出(fanout)

1.1.3 无名exchange

​ 无名类型的exchange为默认类型,通常使用空字符串进行标识

​ 在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的 原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。

之前我们使用发送消息的代码为

//其中第一个参数就是exchange交换机的无名类型,也就是默认类型
channel.basicPublish("",queueName,null,message.getBytes(StandardCharsets.UTF_8));

第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的

1.2 临时队列

​ 每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称 的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连 接,队列将被自动删除。

​ 临时队列和非临时队列主要看队列是否持久化

​ 创建临时队列的方式是我们在创建信道的时候不给队列设置名称即可

String queueName = channel.queueDeclare().getQueue();

​ 这个时候我们就创建了一个随机字符串的队列名称

1.3 绑定(bindings)

​ 什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队 列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定

在这里插入图片描述

1.4 Fanout

1.4.1 Fanout介绍

​ Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的 所有队列中。系统中默认有些exchange 类型

就类似于村里面的大喇叭一样,只要发出了,所有人都可以听得见

在这里插入图片描述

1.4.2 Fanout代码实现

消费者01

package com.rabbitmq.eason.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

/**
 * 扇出类型的消费者1
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class Consumer01 {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        //声明一个临时队列
        /*
        *
        * 生成一个临时的队列、队列的名称是随机的
        * 当消费者断开与队列的连接的时候, 队列就自动删除了
        * */
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机
        /*
        * 参数一: 队列名称
        * 参数二: 交换机名称
        * 参数三: 路由key
        * */
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        System.out.println("C1等待接收消息");
        //接收消息

        DeliverCallback deliverCallback = (consumerTag,message)-> {
            System.out.println("C1成功接收到消息: " + new String(message.getBody()));
            /*
            * 消息手动应答
            * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("C1接收失败的消息的队列标记值: " + consumerTag);
        };

        //消费者取消消息时触发
        /*
        *
        * 参数一: 队列名称
        * 参数二: 是否自动应答
        * 参数三: 成功确认消息回调函数
        * 参数四: 失败消息回调函数
        * */
        boolean autoAck = false;
        channel.basicConsume(queueName,autoAck,deliverCallback,cancelCallback);

    }

}

消费者02

package com.rabbitmq.eason.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;

import java.util.Arrays;

/**
 * 扇出类型的消费者2
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class Consumer02 {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //声明一个临时队列
        /*
        *
        * 生成一个临时的队列、队列的名称是随机的
        * 当消费者断开与队列的连接的时候, 队列就自动删除了
        * */
        String queueName = channel.queueDeclare().getQueue();

        //绑定交换机
        /*
        * 参数一: 队列名称
        * 参数二: 交换机名称
        * 参数三: 路由key
        * */
        channel.queueBind(queueName,EXCHANGE_NAME,"");

        System.out.println("C2等待接收消息");
        //接收消息

        DeliverCallback deliverCallback = (consumerTag,message)-> {
            System.out.println("C2成功接收到消息: " + new String(message.getBody()));
            /*
            * 消息手动应答
            * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("C2接收失败的消息的队列标记值: " + consumerTag);
        };

        //消费者取消消息时触发
        /*
        *
        * 参数一: 队列名称
        * 参数二: 是否自动应答
        * 参数三: 成功确认消息回调函数
        * 参数四: 失败消息回调函数
        * */
        boolean autoAck = false;
        channel.basicConsume(queueName,autoAck,deliverCallback,cancelCallback);

    }

}

生产者实现

package com.rabbitmq.eason.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.eason.utils.RabbitMQUtils;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
 * Fanout交换机的生产者
 * 发消息给交换机
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class Produce {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String message = scanner.next();
            //发送消息
            /*
            * 参数一: 交换机名称
            * 参数二: 交换机路由key值
            * 参数三: 额外参数
            * 参数四: 消息
            * */
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
        }

    }

}

在这里插入图片描述

1.5 Direct

对于开始Direct之前我们先回顾一下bindings

绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定我们用代码

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");

绑定之后的意义由其交换类型决定。

1.5.1 Direct exchange 介绍

​ 我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去

在这里插入图片描述

例如上图中我们的交换机类型时direct类型,交换机中绑定了两个队列,一个是Q1队列一个是Q2队列,Q1绑定的routingKey键位orange,Q2绑定的routingKey键为black和green

在这种绑定的情况下,我们发送消息的时候我们都会绑定一个路由键,如果某一条消息绑定的路由键为orange,那么交换机就会将这条信息传送给Q1队列,如果绑定的路由键为black或者green,那么此时交换机就会将这些消息传送给Q2队列进行处理

1.5.2 多重绑定

在这里插入图片描述

当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情 况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多

1.5.3 代码实现

交换机接收01编写

package com.rabbitmq.eason.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;

/**
 * Direct交换机01
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class Consumer01 {

    public static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //声明一个队列
        channel.queueDeclare("console",false,false,false,null);

        //绑定交换机
        channel.queueBind("console",EXCHANGE_NAME,"info");
        channel.queueBind("console",EXCHANGE_NAME,"warning");

        DeliverCallback deliverCallback = (consumerTag, message)-> {
            System.out.println("console成功接收到消息: " + new String(message.getBody()));
            /*
             * 消息手动应答
             * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("C1接收失败的消息的队列标记值: " + consumerTag);
        };

        channel.basicConsume("console",true,deliverCallback,cancelCallback);
    }
}

交换机02编写

package com.rabbitmq.eason.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;

/**
 * Direct交换机01
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class Consumer02 {

    public static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //声明一个队列
        channel.queueDeclare("disk",false,false,false,null);

        //绑定交换机
        channel.queueBind("disk",EXCHANGE_NAME,"error");

        DeliverCallback deliverCallback = (consumerTag, message)-> {
            System.out.println("disk队列名成功接收到消息: " + new String(message.getBody()));
            /*
             * 消息手动应答
             * */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("C1接收失败的消息的队列标记值: " + consumerTag);
        };

        channel.basicConsume("disk",true,deliverCallback,cancelCallback);
    }
}

生产者编写

package com.rabbitmq.eason.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.eason.utils.RabbitMQUtils;

import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
 * 指定交换机生产者
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class Produce {

    public static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();



        //这个类型的交换机只是多了一个用于标识的RoutingKey标识某一个交换机
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()) {
            String message = scanner.next();
            //发送消息
            //此时所有发送都会发送给队列console
            /*channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes(StandardCharsets.UTF_8));*/
            //此时所有发送都会发送给队列disk
            channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }

}

在这里插入图片描述

可以根据RoutingKey的不同传输给不同的队列进行消息的处理

1.6 Topic

​ 当我们使用了Direct交换机能够大大的活跃了我们分配消息给不同队列的消息处理,但是我们还是没有办法完全的完成我们的需求,尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候 就只能使用 topic 类型

1.6.1 Topic主题队列的要求

​ 发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的

  • *(星号)可以代替一个单词
  • #(井号)可以替代零个或多个单词

1.6.2 主题队列匹配规则

在这里插入图片描述

如上图所示

Q1队列绑定的是* .orange. * ,表示中间带 orange 带 3 个单词的字符串( * .orange. )

Q2队列绑定的是*. *.rabbit和lazy.#

*最后一个单词是 rabbit 的 3 个单词(*. .rabbit)

第一个单词是 lazy 的多个单词(lazy.#)

测试routingKey匹配的队列
quick.orange.rabbit被队列 Q1Q2 接收到
lazy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
azy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2

当队列绑定关系是下列这种情况时需要引起注意

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了

如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

1.6.3 主题队列代码实现

生产者

package com.rabbitmq.eason.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.eason.utils.RabbitMQUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

/**
 * 主题队列生产者
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class Produce {

    public static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        Map<String,String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");

        bindingKeyMap.forEach((key1, value) -> {
            try {
                //发送消息
                channel.basicPublish(EXCHANGE_NAME,key1,null,value.getBytes(StandardCharsets.UTF_8));
                System.out.println("生产者发送消息: " + value);
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

    }

}

主题队列交换机01

package com.rabbitmq.eason.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;

/**
 * 声明主题交换机及相关队列
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class TopicExchange01 {

    public static final String EXCHANGE_NAME = "topic_exchange";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String orangeKey = "*.orange.*";
        String queueName = "q1";
        //声明一个队列
        channel.queueDeclare(queueName,false,false,false,null);
        //绑定队列并且指定routingKey
        channel.queueBind(queueName,EXCHANGE_NAME,orangeKey);

        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            System.out.println("交换机成功收到消息: " + new String(message.getBody()) + ",获取的键为: " + message.getEnvelope().getRoutingKey());
        });

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("交换机失败消息标记: " + consumerTag);
        };

        //接收消息
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

    }
}

主题队列交换机02

package com.rabbitmq.eason.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.eason.utils.RabbitMQUtils;

/**
 * 声明主题交换机及相关队列
 *
 * @author HuangSiYuan
 * @since 2021-07-09
 */
public class TopicExchange02 {

    public static final String EXCHANGE_NAME = "topic_exchange";

    //接收消息
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String rabbitKey = "*.*.rabbit";
        String lazyKey = "lazy.#";
        String queueName = "q2";
        //声明一个队列
        channel.queueDeclare(queueName,false,false,false,null);
        //绑定队列并且指定routingKey
        channel.queueBind(queueName,EXCHANGE_NAME,rabbitKey);
        channel.queueBind(queueName,EXCHANGE_NAME,lazyKey);

        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            System.out.println("交换机成功收到消息: " + new String(message.getBody())+ ",绑定的键为: " + message.getEnvelope().getRoutingKey());
        });

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("交换机失败消息标记: " + consumerTag);
        };

        //接收消息
        channel.basicConsume(queueName,true,deliverCallback,cancelCallback);

    }
}

小结

以上就是本次博客的全部内容,主要介绍有RabbitMQ交换机的三种模式,广播模式(Fanout),直接交换模式(Direct),主题模式(Topic)三种,其中主题模式使用占比较大,因为相较于其他两种模式来说主题模式较为灵活,可用性更高
大家在看本篇博客的时候,如果有什么观点有误,或者有些需要改正的地方大家可以多多提出,大家一起学习,共同进步!!!

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐