4.Springboot2.x集成 RabbitMQ的几种常见用法

Spring Boot 集成 RabbitMQ的几种常见用法

前言

Spring Boot 集成 RabbitMQ 非常简单,如果只是简单的使用配置非常少,Spring Boot 提供了spring-boot-starter-amqp 项目对消息各种支持。

引入依赖

compile group: 'org.springframework.boot', name: 'spring-boot-starter-amqp'

添加配置文件yml

server:
  port: 8001

spring:
  application:
    name: zoo-plus-rabbitmq
  rabbitmq:
    virtual-host: /
    host: localhost
    port: 5672
    username: guest
    password: guest

直接通过队列(单生-单消)

我们需要先配置队列

/**
 * @author: 谢飞
 */
@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue() {
        return new Queue("hello-queue");
//        return QueueBuilder.durable("hello-queue").build();
    }

}

配置队列时我们可以直接new一个队列,也可以直接用链式编程构建一个队列。

配置消费者

/**
 * @author: 谢飞
 */
@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = {"hello-queue"})
    public void helloQueue(Message message, Channel channel) {
        log.info("-----------------hello-queue消费:" + new String(message.getBody()));
    }
}

编写测试类测试

/**
 * @author: 谢飞
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestRabbitMQ {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送到hello-queue
     */
    @Test
    public void sendHelloQueue() {
        rabbitTemplate.convertAndSend("hello-queue", "这是发送到hello队列的消息");
    }
}

然后我们运行测试方法,发现控制台已经打印消费端打印的信息了。

通过交换机(Exchange)发送到队列

通过fanout类型交换机发送

1.编写配置

/**
 * @author: 谢飞
 */
@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue() {
        return QueueBuilder.durable("hello-queue").build();
    }

    @Bean
    public Queue helloQueue2() {
        return new Queue("hello-queue2");
    }


    /**
     * Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。
     */
    @Bean
    FanoutExchange helloFanoutExchange() {
        return ExchangeBuilder.fanoutExchange("hello-fanout-exchange").build();
    }

    /**
     * 绑定hello-queue队列到hello-fanout-exchange交换机
     */
    @Bean
    Binding bindingHello() {
        return BindingBuilder.bind(helloQueue()).to(helloFanoutExchange());
    }

    /**
     * 绑定hello-queue2队列到hello-fanout-exchange交换机
     */
    @Bean
    Binding bindingHello2() {
        return BindingBuilder.bind(helloQueue2()).to(helloFanoutExchange());
    }

}

2.监听类:

/**
 * @author: 谢飞
 */
@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = {"hello-queue"})
    public void helloQueue(Message message, Channel channel) {
        log.info("-----------------hello-queue消费:" + new String(message.getBody()));
    }

    @RabbitListener(queues = {"hello-queue2"})
    public void helloQueue2(Message message, Channel channel) {
        log.info("-----------------hello-queue2消费:" + new String(message.getBody()));
    }
}

3.测试类

    /**
     * 发送到hello-fanout-exchange交换机
     * 注意:fanout这种模式与routingKey无关,消息发送到与这个交换机绑定的队列
     */
    @Test
    public void sendFanoutExchange() {
        rabbitTemplate.convertAndSend("hello-fanout-exchange", "", "这是发送到hello-fanout-exchange交换机的消息");
    }

通过topic类型交换机发送

topic 是 RabbitMQ 中最灵活的一种方式,可以根据 routing_key *的绑定不同的队列

1.编写配置

/**
 * @author: 谢飞
 */
@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue3() {
        return new Queue("hello-queue2-topic");
    }

    ////////////////////////////////topic////////////////////////////////////

    /**
     * 创建Topic Exchange交换机也叫通配符交换机
     * Topic Exchange主要有两种通配符:# 和 *
     * *(星号):可以(只能)匹配一个单词
     * #(井号):可以匹配多个单词(或者零个)
     */
    @Bean
    TopicExchange helloTopicExchange() {
        return ExchangeBuilder.topicExchange("hello-topic-exchange").build();
    }

    /**
     * 绑定hello-queue3队列到hello-topic-exchange交换机
     */
    @Bean
    Binding bindingHello3() {
        return BindingBuilder.bind(helloQueue3()).to(helloTopicExchange()).with("hello.#");
    }
}

2.消费端

    @RabbitListener(queues = {"hello-queue2-topic"})
    public void helloQueue3(Message message, Channel channel) {
        log.info("-----------------hello-queue3消费:" + new String(message.getBody()));
    }

3.测试段

    /**
     * 发送到hello-topic-exchange交换机
     */
    @Test
    public void sendTopicExchange() {
        rabbitTemplate.convertAndSend("hello-topic-exchange", "hello.123", "这是发送到hello-topic-exchange交换机的消息");
    }

对象的支持(实体必须序列化)

实体:

/**
 * @author: 谢飞
 */
@Data
public class Order implements Serializable {
    private Long id;
    private String orderId;
}

消费者:

    @RabbitListener(queues = {"order-queue"})
    public void orderQueue(Order order) {
        log.info("-----------------order-queue消费:" + order);
    }

测试类:

    /**
     * 发送Order
     */
    @Test
    public void sendOrder() {
        Order order = new Order();
        order.setId(1L);
        order.setOrderId("123123123123");
        rabbitTemplate.convertAndSend("order-queue", order);
    }

使用注解的方式

使用注解的方式不需要提前声明配置,只需要在注解里面配置好就行,项目启动自动加载。

消费者:

    /**
     * 通过注解
     */

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1", declare = "true"),
            exchange = @Exchange(value = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    ))
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws IOException {
        System.out.println("---------------------------------------------");
        System.out.println("收到消息:" + new String(message.getBody()));
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, false);
    }

    /**
     * 传递实体
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-2", declare = "true"),
            exchange = @Exchange(value = "exchange-2", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"),
            key = "springboot.*"
    ))
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws IOException {
        System.out.println("---------------------------------------------");
        System.out.println("收到消息:" + order);
        long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }

生产者:

    /**
     * 使用注解
     */
    @GetMapping("queue1")
    public Resp queue1() {
        amqpTemplate.convertAndSend("exchange-1", "springboot.abc", "测试queue1 使用注解");
        return Resp.success("ok", null);
    }

    /**
     * 发送实体
     */
    @GetMapping("queue2")
    public Resp queue2() {
        Order order = new Order();
        order.setId(1L);
        amqpTemplate.convertAndSend("exchange-2", "springboot.abc", order);
        return Resp.success("ok", null);
    }

总结:在springboot中使用rabbitMQ十分方便,大致的简单用法就这些,后续我们将学习到更多rabbitMQ的高级用法。

源码地址:https://gitee.com/zoo-plus/springboot-learn/tree/2.x/springboot-middleware/rabbitmq

4.Springboot2.x集成 RabbitMQ的几种常见用法4.Springboot2.x集成 RabbitMQ的几种常见用法 NPException 发布了474 篇原创文章 · 获赞 142 · 访问量 56万+ 他的留言板 关注
上一篇:Java-FTL模板中的当前日期


下一篇:springboot2 集成swaggerui