RabbitMQ消息确认机制

概念

RabbitMQ消息确认机制

确认机制--》可靠抵达

发送端确认

#配置文件中 开启发送端到达服务器确认
spring.rabbitmq.publisher-confirms = true
#开启发送端消息抵达队列确认
spring.rabbitmq.publisher-returns = true
#只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory = true
@Configuration
public class MyRabbitMQConfig {

    // @Resource
    // private RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

/*    *//**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     *//*
    @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        *//**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         *//*
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        *//**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         *//*
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }*/

}

消费端确认

1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,

问题:

我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。发生消息丢失;

解决方法:消费者手动确认模式,

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RbbitHandler
public void recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException{
    System,out.println("接收到消息",+content);
    byte[] body = message.getBody();
    //消息头属性信息
    MessageProperties properties = message.getMessageProperties();
    //Thread.sleep(3000);
    System.out.println("消息处理完成==》"+content.getName());
    //deliveryTag 是 Channel内按顺序自增的
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    System.out.println("deliveryTag==>"+deliveryTag);
    
    //手动确认,批量模式
    try{
        if(deliveryTag%2==0){
            channel.basicAck(deliveryTag, false);
        }else{
            //退货 requeue=false?丢弃:发回服务器,重新入队
            //long deliveryTag, boolean multiple, boolean requeue
            channel.basicNack(deliveryTag, false, true);
            //long deliveryTag, boolean requeue
            //channle.basicReject();
        }
    }catch(Exception e){
        //网络中断
    }
}
上一篇:现有rabbitmq集群添加新节点,移除旧节点(可以作为rabbitmq集群迁移使用)


下一篇:Docker安装Rabbitmq并实现挂载宿主机数据目录