RabbitMQ工作模式和代码实现备忘录

Official website of RabbitMQ: https://www.rabbitmq.com/

work pattern of RabbitMQ: https://www.rabbitmq.com/getstarted.html

 

 

pattern-one:simple pattern

 RabbitMQ工作模式和代码实现备忘录

Producer-code:

public class Producer {

    static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/yasuo");
        connectionFactory.setUsername("abert");
        connectionFactory.setPassword("abert");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        /*
        参数1:队列名称
        参数2:是否定义持久化队列(消息会持久化保存在服务器上)
        参数3:是否独占本连接
        参数4:是否在不使用的时候队列自动删除
        参数5:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        /*
        参数1:交换机名称;如果没有则指定空字符串(表示使用默认的交换机)
        参数2:路由key,简单模式可以使用队列名称
        参数3:消息其他属性
        参数4:消息内容
         */
        String message = "你好呀!可爱的小尾巴";
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("列车已经出发");
        channel.close();
        connection.close();
    }
}

ConnectionUtil-code:

public class ConnectionUtil {
    public static Connection getConn() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/vh1");
        connectionFactory.setUsername("abert");
        connectionFactory.setPassword("abert");
        return connectionFactory.newConnection();
    }
}

Consumer-code:

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        Channel channel = connection.createChannel();
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
        //创建消费者(接受消息并处理消息,重写方法)
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //路由key
                System.out.println("消息线路为:"+envelope.getRoutingKey());
                //交换机
//                System.out.println("交换机为:"+envelope.getExchange());
                //消息id
//                System.out.println("消息id为:"+envelope.getDeliveryTag());
                //接受到的消息
                System.out.println("接受到的内容为:"+new String(body,"utf-8"));
            }
        };
        /*
        参数1:队列名
        参数2:是否自动确认;设置为true表示消息接收到了
              自动向MQ回复,MQ将其从消息队列中删除
              设置为false表示需要手动确认
        参数3:消费者
         */
        channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);
    }
}

 Result:

RabbitMQ工作模式和代码实现备忘录

RabbitMQ工作模式和代码实现备忘录

通过结果可以看出,对于简单模式,说白了就是消息生产者到消息消费者端对端的通信,但这里可能会产生个误解,包括官方图也是这样画的,那就是消息生产者直接发送消息到队列,然后消费者从队列中取消息。

实际情况是,消息必须经由交换机然后转发到对应队列,这里使用了默认交换机。

 

 

pattern-two:work queues pattern

RabbitMQ工作模式和代码实现备忘录

Producer-code:

public class Producer {
    
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        for (int i =0;i<30;i++) {
            String message = "你好呀!"+i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        }
        channel.close();
        connection.close();
    }
}

Consumer1-code:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
        //每次可以预取多少个消息
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("消费者1内容为:"+new String(body,"utf-8"));
                    Thread.sleep(1000);
                    //确认消息
                /*
                参数1:消息id
                参数2:false表示只有当前这条消息被处理
                 */
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);
    }
}

Consumer2-code:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,false,null);
        channel.basicQos(1);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    System.out.println("消费者2内容为:"+new String(body,"utf-8"));
                    Thread.sleep(1000);
                    channel.basicAck(envelope.getDeliveryTag(),false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);
    }
}

 Result:

 RabbitMQ工作模式和代码实现备忘录

RabbitMQ工作模式和代码实现备忘录                                          RabbitMQ工作模式和代码实现备忘录

对于工作队列模式,生产者生成了30条信息,  消费者1全为奇数,消费者2全为偶数,所以消费者之间是竞争关系。至于为什么一边全是偶数一边全是奇数,一种可能是时间差的巧合,也有可能消息分发本身采用轮询机制,确保每个消费者都能拿到消息。

目前没遇到项目落地情况,所以就不做进一步探讨。

 

 

pattern-three:publish/subscribe

RabbitMQ工作模式和代码实现备忘录

public class Producer {
    //交换机名称
    static final String FANOUT_EXCHANGE = "fanout_exchange";
    //定义队列一名称
    static final String FANOUT_QUEUE_1 = "fanout_queue_1";
    //定义队列二名称
    static final String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConn();
        Channel channel = connection.createChannel();
        //声明一个交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic)
        channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
        channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
        channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
//        队列绑定到交换机(参数1:队列名称,参数2:交换机名称,参数3:路由key)
        channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
        channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
        for (int i =0;i<10;i++) {
            String message = "你好呀!"+i;
            /**
             * 参数1:交换机名称;如果没有则指定空字符串(表示默认的交换机)
             * 参数2:路由key,简单模式中可以使用队列名称
             * 参数3:消息其它属性
             * 参数4:消息内容
             */
            channel.basicPublish(FANOUT_EXCHANGE,"", null, message.getBytes());
            System.out.print(i+",");
        }
        channel.close();
        connection.close();
    }
}

Consumer1-code:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
        channel.queueDeclare(Producer.FANOUT_QUEUE_1,true,false,false,null);
        //绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_1,Producer.FANOUT_EXCHANGE,"");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1内容为:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Producer.FANOUT_QUEUE_1,true,defaultConsumer);
    }
}

Consumer2-code:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.FANOUT_EXCHANGE,BuiltinExchangeType.FANOUT);
        channel.queueDeclare(Producer.FANOUT_QUEUE_2,true,false,false,null);
        //绑定交换机
        channel.queueBind(Producer.FANOUT_QUEUE_2,Producer.FANOUT_EXCHANGE,"");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2内容为:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Producer.FANOUT_QUEUE_2,true,defaultConsumer);
    }
}

 Result:

RabbitMQ工作模式和代码实现备忘录

 RabbitMQ工作模式和代码实现备忘录                            RabbitMQ工作模式和代码实现备忘录

可以看出,在发布与订阅模式,交换机会把消息给每个绑定的队列,所以消费者之间不存在竞争关系 

 

 

pattern-four:routing

RabbitMQ工作模式和代码实现备忘录

 Producer:

public class Producer {
    //交换机名称
    static final String DIRECT_EXCHANGE = "direct_exchange";
    //定义队列一名称
    static final String DIRECT_QUEUE_1 = "direct_queue_1";
    //定义队列二名称
    static final String DIRECT_QUEUE_2 = "direct_queue_2";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConn();
        Channel channel = connection.createChannel();
        //声明一个交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic)
        channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(DIRECT_QUEUE_1,true,false,true,null);
        channel.queueDeclare(DIRECT_QUEUE_2,true,false,true,null);
//        队列绑定到交换机(参数1:队列名称,参数2:交换机名称,参数3:路由key)
        channel.queueBind(DIRECT_QUEUE_1,DIRECT_EXCHANGE,"1");
        channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"2");

        String message1 = "你好呀!路由key为1";
        String message2 = "你好呀!路由key为2";
        String message3 = "要吃烧饼吗?路由key为2";

        channel.basicPublish(DIRECT_EXCHANGE,"1", null, message1.getBytes());
        channel.basicPublish(DIRECT_EXCHANGE,"2", null, message2.getBytes());
        channel.basicPublish(DIRECT_EXCHANGE,"2", null, message3.getBytes());
        System.out.print("send over");

        channel.close();
        connection.close();
    }
}

Consumer1:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.queueDeclare(Producer.DIRECT_QUEUE_1,true,false,true,null);
        //绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_1,Producer.DIRECT_EXCHANGE,"1");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1内容为:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Producer.DIRECT_QUEUE_1,true,defaultConsumer);
    }
}

Consumer2:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.queueDeclare(Producer.DIRECT_QUEUE_2,true,false,true,null);
        //绑定交换机
        channel.queueBind(Producer.DIRECT_QUEUE_2,Producer.DIRECT_EXCHANGE,"2");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2内容为:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Producer.DIRECT_QUEUE_2,true,defaultConsumer);
    }

Result:

RabbitMQ工作模式和代码实现备忘录

RabbitMQ工作模式和代码实现备忘录

RabbitMQ工作模式和代码实现备忘录

对于路由模式,交换器按照队列指定的路由key进行消息分发,同时也侧面印证了消息是发给交换器而不是直接发送给队列的

 

 

pattern-five:topics

RabbitMQ工作模式和代码实现备忘录

 

Producer-code:

public class Producer {
    //交换机名称
    static final String TOPIC_EXCHANGE = "topic_exchange";
    //定义队列一名称
    static final String TOPIC_QUEUE_1 = "topic_queue_1";
    //定义队列二名称
    static final String TOPIC_QUEUE_2 = "topic_queue_2";
    //定义队列二名称
    static final String TOPIC_QUEUE_3 = "topic_queue_3";

    public static void main(String[] args) throws IOException, TimeoutException {

        Connection connection = ConnectionUtil.getConn();
        Channel channel = connection.createChannel();
        //声明一个交换机;参数1:交换机名称,参数2:交换机类型(fanout,direct,topic)
        channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

        String message1 = "你好呀!路由key为1.1";
        String message2 = "你好呀!路由key为2.1";
        String message3 = "要吃烧饼吗?路由key为2.2";
        String message4 = "要吃烧饼吗?路由key为2.2.2";

channel.basicPublish(TOPIC_EXCHANGE,"1.1", null, message1.getBytes()); channel.basicPublish(TOPIC_EXCHANGE,"2.1", null, message2.getBytes()); channel.basicPublish(TOPIC_EXCHANGE,"2.2", null, message3.getBytes()); channel.basicPublish(TOPIC_EXCHANGE,"2.2.2", null, message4.getBytes()); System.out.print("send over"); channel.close(); connection.close(); } }

Consumer1-code:

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(Producer.TOPIC_QUEUE_1,true,false,true,null);
        //绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"1.1");
        channel.queueBind(Producer.TOPIC_QUEUE_1,Producer.TOPIC_EXCHANGE,"2.1");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1内容为:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Producer.TOPIC_QUEUE_1,true,defaultConsumer);
    }
}

Consumer2-code:

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE,BuiltinExchangeType.TOPIC);
        channel.queueDeclare(Producer.TOPIC_QUEUE_2,true,false,true,null);
        //绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_2,Producer.TOPIC_EXCHANGE,"2.*");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2内容为:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Producer.TOPIC_QUEUE_2,true,defaultConsumer);
    }
}

Consumer3-code:

public class Consumer3 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConn();
        final Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
        channel.queueDeclare(Producer.TOPIC_QUEUE_3,true,false,true,null);
        //绑定交换机
        channel.queueBind(Producer.TOPIC_QUEUE_3,Producer.TOPIC_EXCHANGE,"2.#");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者3内容为:"+new String(body,"utf-8"));
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Producer.TOPIC_QUEUE_3,true,defaultConsumer);
    }
}

Result:

RabbitMQ工作模式和代码实现备忘录

RabbitMQ工作模式和代码实现备忘录

 RabbitMQ工作模式和代码实现备忘录

 RabbitMQ工作模式和代码实现备忘录

 对于topic模式,Consumer1队列绑定了两个路由key,侧面印证了队列可以绑定多个路由key,Consumer2和Consumer3都使用通配符*或#

 

总结:

RabiitMQ五种模式演示完毕,对于RabbitMQ,主要需要理清连接(Connection),信道(Channel),交换机(exchange),路由键(routing key),队列(queue)四者的关系,包括下图Rabbit服务界面也主要由这4部分组成(见页面菜单栏)

RabbitMQ工作模式和代码实现备忘录

一、通信的流程

1.在主线程中,首先和RabbitMQ服务器建立一个连接,这个连接需要指定ip:port,具体虚拟机,同时还需要用户名,密码作为验证。

2.连接建立后,需要创建一个信道,通过信道发送消息到具体交换机。

3.交换机(本质就是一张路由查询表)通过路由键将消息分发给满足路由key条件的队列。

4.消费者通过监听队列拿出消息。

 

二、通信的基本单位

RabbitMQ连接是基于TCP的,在一条TCP上建立了需多信道(channel),每一条信道由一个独立的线程维护,所以信道是通信的基本单位。通过下面代码及方法也可以看出

channel.exchangeDeclare

channel.queueDeclare

channel.queueBind

channel.basicPublish

交换机的声明,队列的声明,交换机和队列的绑定,队列和路由键的绑定,消息发送(指明交换机,路由键,消息内容)都在信道方法中完成,因此信道是通信的基本单位。

 

 

三、交换机的三种模式(Header不常用)

Fanout:广播,只要绑定到交换机的队列都会分发消息,即1对N。

Direct:定向,只有消息路由key与队列路由key一致交换机才会分发消息,即1对1。(一个队列可以指定多个路由key)

Topic:匹配,*占位一个词,#占位多个词,只要能够匹配交换机就会分发消息,即N对1。

RabbitMQ工作模式和代码实现备忘录

 

上一篇:springboot集成kafka实现producer和consumer


下一篇:rabbitMq完整通信(一)---producer