本文最后更新于 2024-04-21,欢迎来到我的Blog! https://www.zpeng.site/

SpringBoot整合RabbitMQ

1、依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2、application.yml

spring:
  rabbitmq:
    # RabbitMQ服务器地址,可以是单个地址或逗号分隔的多个地址列表
    host: localhost
    # RabbitMQ服务器端口,默认为5672
    port: 5672
    # RabbitMQ用户名
    username: your_rabbitmq_username
    # RabbitMQ密码
    password: your_rabbitmq_password
    # 虚拟主机(vhost),默认为"/",请根据实际设置填写
    virtual-host: your_virtual_host

    # 连接配置(可选)
    connection:
      # 心跳间隔时间(单位:秒),默认为60
      request-heartbeat: 60
      # 网络恢复后重新尝试连接的时间间隔(单位:毫秒),默认为5000
      network-recovery-interval: 5000

    # 消费者配置(可选)
    listener:
      # 最大并发消费者数,即同时处理消息的数量
      concurrency: 1
      # 当达到最大并发消费者数时,新消息进入队列的等待策略。可选值:ACK, REQUEUE, DROP;默认为REQUEUE
      default-requeue-rejected: true
      # 是否开启批量消费模式
      batch-enabled: false
      # 批量消费模式下的消息数量阈值
      batch-size: 10
      # 批量消费模式下等待更多消息的超时时间(单位:毫秒)
      batch-timeout: 1000

    # 模板配置(可选)
    template:
      # 是否开启publisher confirms确认机制
      mandatory-publish: true
      # 是否开启return callback回调机制
      return-callbacks: true

    # 自定义交换机、队列、绑定等配置(可选)

请根据实际情况替换上述配置中的占位符(如your_rabbitmq_username、your_rabbitmq_password、your_virtual_host等)为实际的RabbitMQ服务信息。此外,如果需要自定义交换机、队列、绑定等设置,可以在spring.rabbitmq下添加相应配置,具体格式参考Spring Boot官方文档或RabbitMQ官方文档。

注意:以上配置适用于大部分常规场景,某些高级特性或特定需求可能需要更复杂的配置。在实际应用中,请根据RabbitMQ服务器的设置和项目需求调整这些配置项。

3、Direct Exchange

直连型交换机,根据消息携带的路由键将消息投递给对应队列。

大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。

然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。

DirectRabbitConfig

创建一个交换机绑定两个队列对应两个键

package com.example.rabbitmq.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName DirectRabbitConfig
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 14:42
 */

@Configuration
public class DirectRabbitConfig {

    /**
     * 创建一个名为TestDirectQueue的队列。
     *
     * @return Queue 返回创建的队列对象。
     */
    @Bean
    public Queue TestDirectQueue() {
        // 创建一个持久化的队列
        return new Queue("TestDirectQueue", true);
    }

    /**
     * 创建一个名为TestDirectQueue2的队列。
     *
     * @return Queue 返回创建的队列对象。
     */
    @Bean
    public Queue TestDirectQueue2() {
        // 创建一个持久化的队列
        return new Queue("TestDirectQueue2", true);
    }

    /**
     * 创建一个名为TestDirectExchange的直连交换器。
     *
     * @return DirectExchange 返回创建的直连交换器对象。
     */
    @Bean
    DirectExchange TestDirectExchange() {
        // 定义一个直连交换器
        return new DirectExchange("TestDirectExchange");
    }

    /**
     * 创建一个绑定,将TestDirectQueue队列与TestDirectExchange交换器绑定,
     * 使用的路由键为TestDirectRouting。
     *
     * @return Binding 返回创建的绑定对象。
     */
    @Bean
    Binding bindingDirect() {
        // 绑定队列到交换器上
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
    }

    @Bean
    Binding bindingDirect2() {
        // 绑定队列到交换器上
        return BindingBuilder.bind(TestDirectQueue2()).to(TestDirectExchange()).with("TestDirectRouting2");
    }
}

DirectProducer

向"TestDirectExchange"发送消息,使用"TestDirectRouting"和"TestDirectRouting2"两个Routing Key

package com.example.rabbitmq.rabbit.direct;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

/**
 * @ClassName DirectController
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 15:00
 */
@RestController // 声明一个RESTful控制器
@RequestMapping("/direct") // 为该控制器指定根URL路径
public class DirectProducer {
    @Autowired // 自动注入RabbitMQ的AmqpTemplate对象
    private AmqpTemplate rabbitTemplate;

    /**
     * 向RabbitMQ发送消息的接口。
     * 该方法不接受任何参数,也不返回任何结果。
     * 它将连续发送10条消息到指定的Exchange和Routing Key。
     */
    @RequestMapping("/send")
    public void send() {
        for (int i = 0; i < 10; i++) {
            // 构造消息内容,并打印到控制台
            String context = "第" + i + "个消息" + new Date();
            System.out.println("Sender : " + context);
            // 向"TestDirectExchange"发送消息,使用"TestDirectRouting"和"TestDirectRouting2"两个Routing Key
            this.rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", context);
            this.rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting2", context);
        }
    }
}

DirectConsumer

直接消息消费者类,监听名为"TestDirectQueue"的队列。

package com.example.rabbitmq.rabbit.direct;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName DirectConsumer
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 15:35
 */

/**
 * 直接消息消费者类,监听名为"TestDirectQueue"的队列。
 */
@Component
@RabbitListener(queues = {"TestDirectQueue"})
public class DirectConsumer {
    /**
     * 处理接收到的消息。
     *
     * @param testMessage 接收到的消息内容,类型为String。
     */
    @RabbitHandler
    public void process(String testMessage) {
        // 打印接收到的消息
        System.out.println("DirectProducer消费者收到消息  : " + testMessage);
    }
}

测试

http://localhost:8080/direct/send

4、Fanout Exchange

扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

FanoutRabbitConfig

创建了一个交换机绑定了三个队列

package com.example.rabbitmq.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName FanoutRabbitConfig
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:28
 */
@Configuration // 表示这是一个配置类,通常用于Spring框架中配置Bean的定义
public class FanoutRabbitConfig {

    /**
     * 创建一个Fanout类型的Exchange。
     * Fanout Exchange会将所有发送到该Exchange的消息广播到所有绑定到该Exchange的Queue中。
     *
     * @return 返回一个名为"TestFanoutExchange"的FanoutExchange实例。
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("TestFanoutExchange");
    }

    /**
     * 创建一个名为"fanout.A"的Queue。
     *
     * @return 返回一个Queue实例,名称为"fanout.A"。
     */
    @Bean
    public Queue queueA() {
        return new Queue("fanout.A");
    }

    /**
     * 创建一个名为"fanout.B"的Queue。
     *
     * @return 返回一个Queue实例,名称为"fanout.B"。
     */
    @Bean
    public Queue queueB() {
        return new Queue("fanout.B");
    }

    /**
     * 创建一个名为"fanout.C"的Queue。
     *
     * @return 返回一个Queue实例,名称为"fanout.C"。
     */
    @Bean
    public Queue queueC() {
        return new Queue("fanout.C");
    }

    /**
     * 将队列"fanout.A"绑定到"TestFanoutExchange"交换器上。
     * 任何发送到"TestFanoutExchange"的消息都会被广播到"fanout.A"队列。
     *
     * @return 返回一个表示队列"fanout.A"与"TestFanoutExchange"交换器绑定关系的Binding实例。
     */
    @Bean
    Binding bindingExchangeA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    /**
     * 将队列"fanout.B"绑定到"TestFanoutExchange"交换器上。
     * 任何发送到"TestFanoutExchange"的消息都会被广播到"fanout.B"队列。
     *
     * @return 返回一个表示队列"fanout.B"与"TestFanoutExchange"交换器绑定关系的Binding实例。
     */
    @Bean
    Binding bindingExchangeB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    /**
     * 将队列"fanout.C"绑定到"TestFanoutExchange"交换器上。
     * 任何发送到"TestFanoutExchange"的消息都会被广播到"fanout.C"队列。
     *
     * @return 返回一个表示队列"fanout.C"与"TestFanoutExchange"交换器绑定关系的Binding实例。
     */
    @Bean
    Binding bindingExchangeC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}

FanoutProducer

发送消息到RabbitMQ的Fanout类型交换器。

package com.example.rabbitmq.rabbit.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName FanoutProducer
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:30
 */
@RestController // 表示这是一个RESTful API的控制器
@RequestMapping("/fanout")
public class FanoutProducer {

    @Autowired // 自动注入RabbitMQ的模板类,用于发送消息
    private AmqpTemplate rabbitTemplate;

    /**
     * 发送消息到RabbitMQ的Fanout类型交换器。
     * 该方法不接受任何参数,也不返回任何内容。
     */
    @RequestMapping("/send")
    public void send() {
        String context = "hello,fanout message!"; // 消息内容
        System.out.println("Sender : " + context); // 打印发送的消息
        rabbitTemplate.convertAndSend("TestFanoutExchange", null, context); // 发送消息到交换器
    }
}

FanoutConsumerA

该组件是一个RabbitMQ的消费者,监听名为"fanout.A"的扇出交换器。

package com.example.rabbitmq.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName FanoutConsumer
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:34
 */

/**
 * 该组件是一个RabbitMQ的消费者,监听名为"fanout.A"的扇出交换器。
 * 扇出交换器会将消息广播到所有绑定到它的队列,这里的消费者会处理这些消息。
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutConsumerA {

    /**
     * 处理接收到的消息。
     *
     * @param testMessage 接收到的消息内容,类型为String。
     *                    该方法会打印出消息的内容到控制台。
     */
    @RabbitHandler
    public void process(String testMessage) {
        System.out.println("FanoutConsumerA消费者收到消息  : " + testMessage.toString());
    }
}

FanoutConsumerB

该组件是一个RabbitMQ的消费者,专门监听名为"fanout.B"的扇出交换器。

package com.example.rabbitmq.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName FanoutConsumerB
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:37
 */

/**
 * 该组件是一个RabbitMQ的消费者,专门监听名为"fanout.B"的扇出交换器。
 * 扇出交换器会将消息广播到所有绑定到它的队列,这个消费者就是用来处理这些被广播的消息。
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutConsumerB {

    /**
     * 处理接收到的消息。
     *
     * @param testMessage 接收到的消息内容,类型为String。
     *                    该方法会将消息打印到控制台,以示接收成功。
     */
    @RabbitHandler
    public void process(String testMessage) {
        System.out.println("FanoutConsumerB消费者收到消息  : " + testMessage.toString());
    }
}

测试

http://localhost:8080/fanout/send

5、Topic Exchange

主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。

①*(星号)仅代表一个单词
②#(井号)代表任意个单词

TopicRabbitConfig

定义了一个交换机绑定了两个队列

package com.example.rabbitmq.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName TopicRabbitConfig
 * @Description:
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:53
 */
@Configuration // 标识为配置类
public class TopicRabbitConfig {

    /**
     * 创建一个主题类型的交换器。
     * @return 返回一个名为"TestTopicExchange"的主题交换器实例。
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("TestTopicExchange");
    }

    /**
     * 创建第一个队列。
     * @return 返回一个名为"TestTopicQueueA"的队列实例。
     */
    @Bean
    public Queue firstQueue() {
        return new Queue("TestTopicQueueA");
    }

    /**
     * 创建第二个队列。
     * @return 返回一个名为"TestTopicQueueB"的队列实例。
     */
    @Bean
    public Queue secondQueue() {
        return new Queue("TestTopicQueueB");
    }

    /**
     * 将第一个队列绑定到交换器上,监听"topic.man"模式的消息。
     * @return 返回一个绑定实例,将"TestTopicQueueA"队列与"TestTopicExchange"交换器绑定。
     */
    @Bean
    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(firstQueue()).to(exchange()).with("topic.man");
    }

    /**
     * 将第二个队列绑定到交换器上,监听"topic.#"模式的消息。
     * @return 返回一个绑定实例,将"TestTopicQueueB"队列与"TestTopicExchange"交换器绑定。
     */
    @Bean
    Binding bindingExchangeMessage2() {
        return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#");
    }

}

TopicProducer

TopicProducer类用于演示通过RabbitMQ发送主题类型的消息。

package com.example.rabbitmq.rabbit.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName TopicProducer
 * @Description: TopicProducer类用于演示通过RabbitMQ发送主题类型的消息。
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:56
 */
@RestController
@RequestMapping("/topic")
public class TopicProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate; // 自动注入RabbitMQ的模板类,用于发送消息

    /**
     * 发送主题为"topic.man"的消息。
     * 这个方法不需要参数,发送一条指定路由键的消息到TestTopicExchange交换器。
     */
    @RequestMapping("/send1")
    public void send1() {
        System.out.println("TopicProducer1 发送消息");
        rabbitTemplate.convertAndSend("TestTopicExchange", "topic.man", "推送消息,路由键为topic.man");
    }

    /**
     * 发送主题为"topic.woman"的消息。
     * 同样不需要参数,此方法发送一条路由键不同的消息到相同的交换器TestTopicExchange。
     */
    @RequestMapping("/send2")
    public void send2() {
        System.out.println("TopicProducer2 发送消息");
        rabbitTemplate.convertAndSend("TestTopicExchange", "topic.woman", "推送消息,路由键为topic.woman");
    }
}

TopicConsumerA

该类为一个RabbitMQ的消费者,监听名为"TestTopicQueueA"的队列。

package com.example.rabbitmq.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName TopicConsumer
 * @Description: 该类为一个RabbitMQ的消费者,监听名为"TestTopicQueueA"的队列。
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:59
 */
@Component
@RabbitListener(queues = "TestTopicQueueA")
public class TopicConsumerA {

    /**
     * 当收到消息时处理消息的逻辑。
     *
     * @param testMessage 接收到的消息内容。
     */
    @RabbitHandler
    public void receive(String testMessage) {
        System.out.println("TopicConsumerA消费者收到消息  : " + testMessage);
    }

}

TopicConsumerB

该类为RabbitMQ的Topic模式消费者B,监听名为"TestTopicQueueB"的队列。

package com.example.rabbitmq.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName TopicConsumer
 * @Description: 该类为RabbitMQ的Topic模式消费者B,监听名为"TestTopicQueueB"的队列。
 * @Author: zpeng
 * @CreateDate: 2024/4/21 16:59
 */
@Component
@RabbitListener(queues = "TestTopicQueueB")
public class TopicConsumerB {

    /**
     * 处理接收到的消息。
     *
     * @param testMessage 接收到的消息内容,类型为String。
     *                    该方法将打印出接收到的消息。
     */
    @RabbitHandler
    public void receive(String testMessage) {
        System.out.println("TopicConsumerB消费者收到消息  : " + testMessage);
    }

}

测试

http://localhost:8080/topic/send1

http://localhost:8080/topic/send2