Spring Boot集成RabbitMQ快速入门Demo

1.什么是RabbitMQ?

RabbitMQ是一款使用Erlang语言开发的,基于AMQP协议的消息中间件,作为一款优秀的消息系统,RabbitMQ有高并发、可扩展等优势,并适用于大型系统中各个模块之间的通信。

RabbitMQ的特点为:

  • 持久化、传输确认、发布确认等功能保证消息可靠

  • 支持多种消息分发模式,处理更加灵活

  • 提供可视化管理界面,使用方便

  • 支持集群部署,保证服务高可用

2.RabbitMQ环境搭建

version: '3'
services:
  rabbitmq:
    image: registry.cn-hangzhou.aliyuncs.com/zhengqing/rabbitmq:3.7.8-management        # 原镜像`rabbitmq:3.7.8-management` 【 注:该版本包含了web控制页面 】
    container_name: rabbitmq            # 容器名为'rabbitmq'
    hostname: my-rabbit
    restart: unless-stopped                                       # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器
    environment:                        # 设置环境变量,相当于docker run命令中的-e
      TZ: Asia/Shanghai
      LANG: en_US.UTF-8
      RABBITMQ_DEFAULT_VHOST: my_vhost  # 主机名
      RABBITMQ_DEFAULT_USER: admin      # 登录账号
      RABBITMQ_DEFAULT_PASS: admin      # 登录密码
    volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
      - "./rabbitmq/data:/var/lib/rabbitmq"
    ports:                              # 映射端口
      - "5672:5672"
      - "15672:15672"

运行

docker-compose -f docker-compose-rabbitmq.yml -p rabbitmq up -d

web管理端:http://127.0.0.1:15672 登录账号密码:admin/admin

27a7e34328fda7e39066d77e99a7d4a8.png

3.代码工程

实验目的:实现通过rabbitmq发送和接收消息

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>


    <artifactId>rabbitmq</artifactId>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
</project>

application.properties

server.port=8088
  #rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vahost

config

简单使用

package com.et.rabbitmq.config;




import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;




@Configuration
public class RabbitConfig {


    @Bean
    public Queue Queue() {
        return new Queue("hello");
    }
}

topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key*的绑定不同的队列 首先对topic规则配置,这里使用两个队列来测试

package com.et.rabbitmq.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicRabbitConfig {


   public final static String TOPIC_ONE = "topic.one";
   public final static String TOPIC_TWO = "topic.two";
   public final static String TOPIC_EXCHANGE = "topicExchange";


   @Bean
   public Queue queue_one(){
      return new Queue(TOPIC_ONE);
   }


   @Bean
   public Queue queue_two(){
      return new Queue(TOPIC_TWO);
   }


   @Bean
   TopicExchange exchange(){
      return new TopicExchange(TOPIC_EXCHANGE);
   }


   @Bean
   Binding bindingExchangeOne(Queue queue_one, TopicExchange exchange){
      return BindingBuilder.bind(queue_one).to(exchange).with("topic.one");
   }


   @Bean
   Binding bindingExchangeTwo(Queue queue_two, TopicExchange exchange){
      //# 表示零个或多个词
      //* 表示一个词
      return BindingBuilder.bind(queue_two).to(exchange).with("topic.#");
   }


}

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

package com.et.rabbitmq.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutRabbitConfig {


    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }


    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }


    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }


    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }


    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }


    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }


    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }


}

receiver

package com.et.rabbitmq.receiver;


import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;


@Service
@Slf4j
public class HelloReceiver {




    @RabbitListener(queues = "hello")
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }


    @RabbitListener(queues = {"topic.one"})
    public void receiveTopic1(@Payload String fileBody) {
        log.info("topic1:" + fileBody);
    }


    @RabbitListener(queues = {"topic.two"})
    public void receiveTopic2(@Payload String fileBody) {
        log.info("topic2:" + fileBody);
    }
    @RabbitListener(queues = {"fanout.A"})
    public void fanoutA(@Payload String fileBody) {
        log.info("fanoutA:" + fileBody);
    }


    @RabbitListener(queues = {"fanout.B"})
    public void fanoutB(@Payload String fileBody) {
        log.info("fanoutB:" + fileBody);
    }
    @RabbitListener(queues = {"fanout.C"})
    public void fanoutC(@Payload String fileBody) {
        log.info("fanoutC:" + fileBody);
    }
}

sender

package com.et.rabbitmq.sender;


import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import java.util.Date;
@Component
public class HelloSender {


   @Autowired
   private AmqpTemplate rabbitTemplate;


   public void send() {
      String context = "hello " + new Date();
      System.out.println("Sender : " + context);
      this.rabbitTemplate.convertAndSend("hello", context);
   }


}
package com.et.rabbitmq.sender;


import com.et.rabbitmq.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class TopicSender {


   @Autowired
   private AmqpTemplate rabbitTemplate;


   //两个消息接受者都可以收到
   public void send_one() {
      String context = "Hi, I am message one";
      System.out.println("Sender : " + context);
      this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one",context);
   }




   //只有TopicReceiverTwo都可以收到
   public void send_two() {
      String context = "Hi, I am message two";
      System.out.println("Sender : " + context);
      this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",context);
   }


}

DemoApplication.java

package com.et.quartz;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class DemoApplication {


   public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
   }
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springboot-demo

4.测试

简单使用

@Test
public void hello() throws Exception {
    helloSender.send();
    Thread.sleep(50000);
}

Topic Exchange

@Test
public void topicOne() throws Exception {
    topicSender.send_one();
    Thread.sleep(50000);
}
@Test
public void topicTwo() throws Exception {
    topicSender.send_two();
    Thread.sleep(50000);
}

Fanout Exchange

@Test
public void sendFanout() throws InterruptedException {
    String context = "hi, fanout msg ";
    System.out.println("Sender : " + context);
    this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    Thread.sleep(50000);
}

5.参考连接

  • https://www.rabbitmq.com/

  • https://spring.io/projects/spring-amqp

上一篇:IDEA 2021.3.3最新激活破解教程(可激活至2099年,亲测有效)


下一篇:安装k8s