Featured image of post Java工程师 中间件 RabbitMQ的使用

Java工程师 中间件 RabbitMQ的使用

🌏Java工程师 RabbitMQ的使用 🎯 这篇文章用于记录 RabbitMQ的使用

🎄前置文章

🍭RabbitMQ的安装 RabbitMQ的安装后 再往下走

🎄管控台常用功能说明

🌴查看监听端口

🌴查看交换机

🌴添加用户并配置访问虚拟主机

🌴添加虚拟主机

🎄RabbitMQ入门

参考RabbitMQ官网

🍭介绍

RabbitMQ 是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要投递的邮件放入邮箱时,您可以确定邮递员最终会将邮件递送给收件人。在这个类比中,RabbitMQ 是一个邮箱、一个邮局和一个信递员

RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块(消息)

RabbitMQ 和一般消息传递会使用一些术语:

  • 生产者(Producer):生产无非就是发送 发送消息的程序是生产者 后面的图会使用下面的图标表示生产者

  • 队列(Queue):队列是 RabbitMQ 中邮箱的名称 尽管消息流经 RabbitMQ 和应用程序 但它们只能存储在队列中 队列仅受主机内存和磁盘限制的约束,它本质上是一个大型消息缓冲区 许多生产者可以将消息发送到一个队列,并且许多消费者可以尝试从一个队列接收数据 后面的图会使用下面的图标表示队列:

  • 消费者(Consumer):消费与接收具有相似的含义 消费者是一个主要等待接收消息的程序

🍭SpringBoot 集成 RabbitMQ

🎯在pom.xml文件加入响应的依赖

<!-- 引入rabbitmq 需要的AMQP依赖 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

🎯application.yml 配置 rabbitmq

  # 20230908 rabbitmq 配置
  rabbitmq:
    host: 101.42.24.90
    username: admin # 默认的guest用户不支持远程访问 使用新建的admin用户
    password: admin
    #虚拟主机
    virtual-host: /
    #端口
    port: 5672
    listener:
      simple:
        #消费者最小数量 P-QUEUE-C
        concurrency: 10
        #消费者最大数量
        max-concurrency: 10
        #限制消费者,每次只能处理一条消息,处理完才能继续下一条消息
        prefetch: 1
        #启动时是否默认启动容器,默认为 true
        auto-startup: true
        #被拒绝时重新进入队列的(提高消息处理的可靠性)
        default-requeue-rejected: true
    template:
      retry:
        #启用消息重试机制,默认为 false 重试机制是指如果消息处理失败 要不要重新再试一次
        enabled: true
        #initial-interval (初始间隔时间):表示第一次重试的时间间隔,也就是在消息第一次处 理失败后,等待多长时间再尝试重新发送消息。这个参数的默认值是 1 秒
        initial-interval: 1000ms
        #重试最大次数,默认为 3 次
        max-attempts: 3
        #max-interval (最大间隔时间):表示重试过程中的最大时间间隔,也就是每次重试时, 最长等待多长时间再尝试重新发送消息。这个参数的默认值是 10 秒
        max-interval: 10000ms
        #重试的间隔乘数
        #配置 2 的话,第一次等 1s,第二次等 2s,第三次等 4s
        multiplier: 1 # 如果倍数设置为 1 则重试时间永远为 1000ms

上面的配置做好后 接下来就可以来探索RabbitMQ的具体使用了

🍭快速入门 一个生产者 一个消费者

这是RabbitMQ使用的最简单的案例 接下来写代码进行实践

📑在常量接口类JinConstant中添加要使用的常量字符串

这个类是我自己定义的 用来存放整个用用程序中会使用到的各种常量 这里添加一个名为QUEUE_NAME_HELLOWORLD的字符串常量 在后面的HelloWorldConfig类中会使用到

【注意❗】每一个队列都有自己的名字 生产者发送消息的时候需要指定队列的名字 表示要向哪一个队列发送消息

public interface JinConstant {
    // RabbitMQ使用模式01 的队列名 
    // 因为是第一个使用案例的探索 所以起名为helloworld
    // 这里的 QUEUE_NAME_HELLOWORLD 变量名可以根据开发者的习惯任意指定
    public static final String QUEUE_NAME_HELLOWORLD = "helloworld";
}

📑编写队列配置类HelloWorldConfig

/**
@author Liu Xianmeng
@createTime 2023/9/8 10:11
@instruction 学习使用RabbitMQ的简单使用模式 一个生产者 一个消息队列 一个消费者
*/

@SuppressWarnings({"all"})
@Configuration // 将此配置类注入容器
public class HelloWorldConfig {

    @Bean
    public Queue getQueue(){
        /*
          【说明】
          (1)Queue类的包名 import org.springframework.amqp.core.Queue;
          (2)下面使用的Queue的构造函数如下
               public Queue(String name, boolean durable) {
                   this(name, durable, false, false, (Map)null);
               }
               - 第一个参数指定要创建的队列的名字
               - 其中第二个参数 durable 表示队列是否持久化
                 队列默认是存放到内存中的 rabbitmq 重启则丢失 若想重启之后还存在则队列要持久化
                 保存到Erlang自带的Mnesia数据库中 当rabbitmq重启之后会读取该数据库
         */
        return new Queue(QUEUE_NAME, true); // true 表示: 持久化
    }
}

📑编写生产者类

/**
@author Liu Xianmeng
@createTime 2023/6/1 15:52
@instruction MQSender: 消息的发送者/生产者
*/

@SuppressWarnings({"all"})
@Service
@Slf4j
public class MQSender {

    /**
     * 装配RabbitTemplate -> 操作RabbitMQ(类比RedisTemplate)
     */
    @Resource
    private RabbitTemplate rabbitTemplate;

    // 编写发送消息的方法
    public void sendHelloWorld(Object msg) {
        log.info("发送消息-->" + msg);
        // 这里的第一个参数 routingKey = "helloworld" 就是刚才在HelloWorldConfig类中定义的队列名
        // 使用常量类JinConstant的静态变量 以避免直接在代码中编写字符串名而引起不必要的错误
        rabbitTemplate.convertAndSend(JinConstant.QUEUE_NAME_HELLOWORLD, msg);
    }
}

📑编写消费者类

/**
@author Liu Xianmeng
@createTime 2023/6/1 15:51
@instruction MQReceiver: 消息的接收者/消费者
*/

@SuppressWarnings({"all"})
@Service
@Slf4j
public class MQReceiver {
    /*
    	⚡注意这里的 @RabbitListener(queues = JinConstant.QUEUE_NAME_HELLOWORLD)
    	注解@RabbitListener传入队列的名字 表示此MQReceiver监听名为JinConstant.QUEUE_NAME_HELLOWORLD的队列的消息
    */
    @RabbitListener(queues = JinConstant.QUEUE_NAME_HELLOWORLD)
    public void receiveHelloWorld(Object msg) {
        log.info("接收到消息-->" + msg);
    }
}

📑编写Controller类

在方法中调用刚才写过的生产者MQSender组件 当浏览器请求对应的路径 控制台就会打印出生产者和消费者工作的日志信息

/**
@author Liu Xianmeng
@createTime 2023/6/3 16:52
@instruction
*/

@SuppressWarnings({"all"})
@Controller
public class RabbitMQController {

    // 装配MQSender生产者对象
    @Resource
    private MQSender mqSender;

    // 调用消息生产者 发送消息
    @RequestMapping("/mq/helloworld")
    @ResponseBody
    public void mq() {
        mqSender.sendHelloWorld("hello, BigBigMeng~");
    }
}

⚡接下来启动SpringBoot项目 并用postman发送测试请求

用postman发送测试请求

查看控制台信息

✨到此 使用RabbitMQ的第一个小案例演示完毕

🎄Why RabbitMQ?

什么是RabbitMQ?

RabbitMQ是一款开源的,Erlang编写的,消息中间件;最大的特点就是消费并不需要确保提供方存在,实现了服务之间的高度解耦

RabbitMQ(优点)

具体见:MQ的优点

(1)解耦:组件通信解耦

(2)异步:异步数据流处理

(3)削峰:高并发场景流量的削峰填谷

🎄RabbitMQ 中完整的消息传递模型

RabbitMQ 消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。其实 在入门案例中 其实我们并没看到交换机 这是因为使用了默认的交换机。

相反,生产者只能将消息发送到交换机。交换是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,它将消息推送到队列。如下图所示:

交换机必须确切地知道如何处理它收到的消息。是否应该将其附加到特定队列?是否应该将其附加到许多队列中?或者应该将其丢弃。其规则由交换类型定义。

有几种可用的交换类型:directtopicheadersfanout 他们分别对应着RabbitMQ的不同的使用模式,接下来逐个进行讨论和演示

1️⃣Fanout(广播|发布订阅)模式

当我们使用交换机的fanout交换类型的时候,就是在使用RabbitMQ的fanout(广播)模式 使用fanout模式的时候,我们需要将接收消息的队列绑定到散布消息的交换机上,绑定后,相当于队列“订阅”了该交换机的消息,当交换机有新的消息到来,绑定它的队列都能收到到来的消息

接下来进行代码的实践 演示fanout模式的使用

📑修改常量接口类JinConstant,向其中添加要使用的常量字符串

/**
 * fanout模式
 */
public static final String QUEUE_FANOUT_01 = "queue_fanout01"; // 测试fanout模式的队列1
public static final String QUEUE_FANOUT_02 = "queue_fanout02"; // 测试fanout模式的队列2
public static final String fanoutExchange = "fanoutExchange"; // 测试fanout模式的交换机

📑新建FanoutConfig类 配置fanout模式要使用的组件(其实也可以直接在配置类HelloWorldConfig中进行配置,是一样的)

/**
@author Liu Xianmeng
@createTime 2023/6/1 15:45
@instruction 学习使用广播模式
*/

@SuppressWarnings({"all"})
@Configuration
public class FonoutConfig {
    // 配置队列1
    @Bean
    public Queue queue1(){
        // 传入参数 队列的名字
        return new Queue(JinConstant.QUEUE_FANOUT_01);
    }
    // 配置队列2
    @Bean
    public Queue queue2(){
        // 传入参数 队列的名字
        return new Queue(JinConstant.QUEUE_FANOUT_02);
    }
    // 配置交换机
    @Bean
    public FanoutExchange exchange(){
        // 传入参数 交换机的名字
        return new FanoutExchange(JinConstant.fanoutExchange);
    }
    // 配置绑定对象1
    @Bean
    public Binding binging01(){
        // 将 queue1 绑定到名为 fanoutExchange的交换机
        return BindingBuilder.bind(queue1()).to(exchange());
    }
    // 配置绑定对象2
    @Bean
    public Binding bingding02(){
        // 将 queue2 绑定到名为 fanoutExchange的交换机
        return BindingBuilder.bind(queue2()).to(exchange());
    }
}

📑修改类MQSender增加fanout模式的方法

// 使用广播模式
public void sendFanout(Object msg) {
    log.info("发送消息" + msg);
    //交换机JinConstant.fanoutExchange将消息发送到名为""空的队列上 -> 即进行广播消息
    rabbitTemplate.convertAndSend(JinConstant.fanoutExchange, "", msg);
}

📑修改类MQReceiver增加fanout模式的方法

/**
 * 监听队列 queue_fanout01
 */
@RabbitListener(queues = JinConstant.QUEUE_FANOUT_01)
public void receive1(Object msg) {
    log.info("从 queue_fanout01 接收消息-" + msg);
}

/**
 * 监听队列 queue_fanout02
 * @param msg
 */
@RabbitListener(queues = JinConstant.QUEUE_FANOUT_02)
public void receive2(Object msg) {
    log.info("从 queue_fanout02 接收消息-" + msg);
}

📑修改Controller类 增加方法

// fanout模式
@GetMapping("/mq/fanout")
@ResponseBody
public void fanout() {
    mqSender.sendFanout("fanout模式 测试发送此消息~");
}

⚡接下来启动SpringBoot项目 并用postman发送测试请求

【❗注意】由于@GetMapping("/mq/fanout")修饰fanout方法,所以要用get方式发送请求

查看控制台打印的结果

🍭观察RabbitMQ管控台

2️⃣Direct(路由)模式

🍭概述

使用fanout模式,并没有给我们带来太大的灵活性——它只能进行无意识的广播

下面使用direct交换来代替。直接交换背后的路由算法很简单 :消息会进入其绑定键与消息的路由键完全匹配的队列 为了说明这一点,见下图:

在此设置中,我们可以看到路由交换机X绑定了两个队列 第一个队列使用绑定键orange进行绑定,第二个队列有两个绑定,一个是black,另一个是green。

在这样的设置中,使用路由键orange发布到交换器的消息 将被路由到队列Q1 路由键为black或green的消息将发送到Q2。所有其他消息将被丢弃。

🍭多重绑定

使用相同的绑定键绑定多个队列是完全合法的。在上面的示例中,我们可以使用绑定键black在X和Q1之间添加绑定。在这种情况下,direct交换的行为将类似于fanout,并将消息广播到所有匹配的队列。带有路由密钥black 的消息将被传递到 Q1和Q2

📑修改常量接口类JinConstant,向其中添加要使用的常量字符串

/**
 * direct路由模式
 */
public static final String QUEUE_DIRECT_01 = "queue_direct01";
public static final String QUEUE_DIRECT_02 = "queue_direct02";
public static final String DIRECT_EXCHANGE = "direct_Exchange";
public static final String ROUNTINGKEY_01 = "queue.red01";
public static final String ROUNTINGKEY_02 = "queue.green02";

📑新建DirectConfig类 配置direct模式要使用的组件(其实也可以直接在配置类HelloWorldConfig中进行配置,是一样的)

/**
@author Liu Xianmeng
@createTime 2023/6/3 20:40
@instruction 学习使用路由模式
*/

@SuppressWarnings({"all"})
@Configuration
public class DirectConfig {
    @Bean
    public Queue queue_direct1() { return new Queue(JinConstant.QUEUE_DIRECT_01); }
    @Bean
    public Queue queue_direct2() { return new Queue(JinConstant.QUEUE_DIRECT_02); }
    @Bean
    public DirectExchange exchange_direct() { return new DirectExchange(JinConstant.DIRECT_EXCHANGE); }
    
    // 将队列绑定到 指定交换机 并指定路由
    @Bean
    public Binding binding_direct1() {
        return BindingBuilder
            .bind(queue_direct1())
            .to(exchange_direct())
            .with(JinConstant.ROUNTINGKEY_01); 
    }
    @Bean
    public Binding binding_direct2() {
        return BindingBuilder
            .bind(queue_direct2()) 
            .to(exchange_direct())
            .with(JinConstant.ROUNTINGKEY_02); 
    } 
}

📑修改类MQSender增加direct模式的方法

 // 使用路由模式
public void sendDirect01(Object msg) {
    log.info("发送消息" + msg);
    // 将此消息发送到directExchange交换机 同时指定路由 queue.red01
    rabbitTemplate.convertAndSend(JinConstant.DIRECT_EXCHANGE, JinConstant.ROUNTINGKEY_01, msg);
}
public void sendDirect02(Object msg) {
    log.info("发送消息" + msg);
    rabbitTemplate.convertAndSend(JinConstant.DIRECT_EXCHANGE, JinConstant.ROUNTINGKEY_02, msg);
}

📑修改类MQReceiver增加direct模式的方法

@RabbitListener(queues = {JinConstant.QUEUE_DIRECT_01})
public void queue_direct01(Object msg){
    log.info("从 queue_direct01 接收消息:" + msg);
}
// 这个方法监听两个队列
@RabbitListener(queues = {JinConstant.QUEUE_DIRECT_01, JinConstant.QUEUE_DIRECT_02})
public void queue_direct02(Object msg){
    log.info("从 queue_direct02 或者 queue_direct02 接收消息:" + msg);
}

📑修改Controller类 增加方法

@GetMapping("/mq/direct01")
@ResponseBody
public void direct01(){
    mqSender.sendDirect01("red01");
}
@GetMapping("/mq/direct02")
@ResponseBody
public void direct02(){
    mqSender.sendDirect02("green02");
}

接下来启动SpringBoot项目 并用postman发送测试请求

先执行:http://localhost:8888/mq/direct02 会发现queue_direct02()方法执行了

后执行:http://localhost:8888/mq/direct01 会发现queue_direct01()方法执行了,而queue_direct02()方法没有执行,这是为什么❓

❓**这是为什么?因为队列中的消息只会被消费一次 **

在上面的例子中,queue_direct01queue_direct02这两个方法都监听了同一个队列QUEUE_DIRECT_01。由于它们都监听相同的队列,当队列中有新消息时,RabbitMQ会根据当前的负载均衡策略从这两个方法中随机选择一个来消费该消息。因此,无法预先确定具体是哪个方法会被执行。

因此,不能确定具体是queue_direct01还是queue_direct02会执行。但是,可以确定的是,在每次消息到达时,只会被其中一个方法消费一次,也就是这两个方法中的某一个会被执行。

3️⃣Topic(主题)模式

🍭 概述

direct 模式会造成路由 RoutingKey 太多, 而实际开发中往往是按照某个规则来进行路由匹配的, RabbitMQ 提供了 Topic模式/主题模式来适应这种需求

Topic 模式相当于是模糊的路由匹配模式,其中*在主题模式中匹配一个单词,而#匹配一个或多个单词,包括零个单词

📑修改常量接口类JinConstant,向其中添加要使用的常量字符串

/**
 * Topic模式
 */
public static final String TOPIC_QUEUE_01 = "queue_topic01";
public static final String TOPIC_QUEUE_02 = "queue_topic02";
public static final String TOPIC_EXCHANGE = "topicExchange";
public static final String TOPIC_ROUTINGKEY01 = "#.queue.#"; // `#`匹配一个或多个单词,包括零个单词, 可以视为通配符
public static final String TOPIC_ROUTINGKEY02 = "*.queue.#"; // `*`在主题模式中匹配一个单词

📑新建TopicConfig类 配置Topic模式要使用的组件(其实也可以直接在配置类HelloWorldConfig中进行配置,是一样的)

/**
@author Liu Xianmeng
@createTime 2023/6/3 20:42
@instruction 学习使用主题模式
*/
@SuppressWarnings({"all"})
@Configuration
public class TopicConfig {
    @Bean
    public Queue queue_topic01() {
        return new Queue(JinConstant.TOPIC_QUEUE_01);
    }
    @Bean
    public Queue queue_topic02() {
        return new Queue(JinConstant.TOPIC_QUEUE_02);
    }
    @Bean // 🎯注意这里是TopicExchange
    public TopicExchange topicExchange() {
        return new TopicExchange(JinConstant.TOPIC_EXCHANGE);
    }
    @Bean
    public Binding binding_topic01() {
        return BindingBuilder.bind(queue_topic01()).to(topicExchange()).with(JinConstant.TOPIC_ROUTINGKEY01);
    }
    @Bean
    public Binding binding_topic02() {
        return BindingBuilder.bind(queue_topic02()).to(topicExchange()).with(JinConstant.TOPIC_ROUTINGKEY02);
    }
}

📑修改类MQSender增加topic模式的方法

/**
 * 使用topic模式
 */
public void sendTopic01(Object msg) {
    log.info("发送消息sendTopic01:" + msg);
    //发送消息到 topicExchange 队列,同时携带 routingKey queue.red.message
    rabbitTemplate.convertAndSend(JinConstant.TOPIC_EXCHANGE, "queue.red.message", msg);
}
public void sendTopic02(Object msg) {
    log.info("发送消息sendTopic02:" + msg);
    //发送消息到 topicExchange 队列,同时携带 routingKey green.queue.green.message
    rabbitTemplate.convertAndSend(JinConstant.TOPIC_EXCHANGE, "green.queue.green.message", msg);
}

📑修改类MQReceiver增加topic模式的方法

@RabbitListener(queues = JinConstant.TOPIC_QUEUE_01)
public void receive05(Object msg) {
    log.info("从 sendTopic01 接收消息-" + msg);
}
@RabbitListener(queues = JinConstant.TOPIC_QUEUE_02)
public void receive06(Object msg) {
    log.info("从 sendTopic02 接收消息-" + msg);
}

📑修改Controller类 增加方法

@RequestMapping(value = "/mq/topic01", method = RequestMethod.GET)
@ResponseBody
public void mqtopic01() {
    mqSender.sendTopic01("Msg_topic01");
}
@RequestMapping(value = "/mq/topic02", method = RequestMethod.GET)
@ResponseBody
public void mqtopic02() {
    mqSender.sendTopic02("Msg_topic02");
}

⚡接下来启动SpringBoot项目 并用postman发送测试请求

http://localhost:8888/mq/topic01 "queue.red.message" 路由 匹配: "#.queue.#"

http://localhost:8888/mq/topic02 "green.queue.green.message" 路由 匹配: "*.queue.#"

Licensed under CC BY-NC-SA 4.0
最后更新于 2023年9月13日