RabbitMQ Work Queues

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

RabbitMQ Work Queues

 

 

 1. creater a publisher to create a queue

            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                var message = GetMessage(args);


                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;

                for (int i = 0; i < 20; i++)
                {
                    var body = Encoding.UTF8.GetBytes(message + i.ToString());
                    channel.BasicPublish(exchange: "",
                                 routingKey: "task_queue",
                                 basicProperties: properties,
                                 body: body);
                    Console.WriteLine(" [x] Sent {0}", message + i.ToString());
                }


            }

2. create two consumers 

    var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "task_queue",
                                     durable: true,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                Console.WriteLine(" [*] Waiting for messages.");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" Consumer1 Received {0}", message);

                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(1000);

                    Console.WriteLine(" [x] Done");

                    // Note: it is possible to access the channel via
                    //       ((EventingBasicConsumer)sender).Model here
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "task_queue",
                                     autoAck: false,
                                     consumer: consumer);

3. attentions

RabbitMQ Work Queues

 RabbitMQ Work Queues

 

 

 4. Testing (1th sleep 2 seconds, 2th sleep 1 second )

RabbitMQ Work Queues

 By default, RabbitMQ will send items to consumers in a average way. Of cause based on the processing speed.

 

上一篇:Dyno-queues 分布式延迟队列 之 辅助功能


下一篇:浅谈Python的RabbitMQ使用