tp5.1 rabbitmq 实现工作模式队列

一个生产者,两个消费者 第一个消费者消费一条休眠0.02秒  第二个休眠1秒
/**
* 工作队列
* @throws \Exception
*
*/
public function tastQueue(){
$connection = new AMQPStreamConnection(‘localhost‘, 5672, ‘admin‘, ‘admin‘);
$channel = $connection->channel();

$queue_name = "task_queue";
//$durable false 消息持久化 true 反之
$channel->queue_declare($queue_name, false, false, false, false);

// $data = implode(‘ ‘, array_slice($argv, 1));
for($i=0;$i<50;$i++){
$data = "Hello World!".$i;
$msg = new AMQPMessage(
$data,
array(‘delivery_mode‘ => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, ‘‘, ‘task_queue‘);
}


echo ‘ [x] Sent ‘, $data, "\n";

$channel->close();
$connection->close();
}

/**
* 消费工作模式
*
*/
public function receiveTask(){
$queue=‘task_queue‘;
$connection = new AMQPStreamConnection(‘localhost‘, 5672, ‘admin‘, ‘admin‘);
$channel = $connection->channel();

$channel->queue_declare($queue, false, false, false, false);

//能者多劳模式 //同一时刻服务器只会发一条消息
$channel->basic_qos(null,1,null);

//监听队列 ture表自动 false表手动返回完成状态
$no_ack=false;

$channel->basic_consume($queue, ‘‘, false, $no_ack, false, false, [$this,‘callbackTask‘]);

while ($channel->is_consuming()) {
$channel->wait();
}
}

/**
* 消费工作模式
*
*/
public function receiveTaskTwo(){
$queue=‘task_queue‘;
$connection = new AMQPStreamConnection(‘localhost‘, 5672, ‘admin‘, ‘admin‘);
$channel = $connection->channel();

$channel->queue_declare($queue, false, false, false, false);

//能者多劳模式 //同一时刻服务器只会发一条消息
$channel->basic_qos(null,1,null);

//监听队列 ture表自动 false表手动返回完成状态
$no_ack=false;

$channel->basic_consume($queue, ‘‘, false, $no_ack, false, false, [$this,‘callbackTaskTwo‘]);

while ($channel->is_consuming()) {
$channel->wait();
}
}

/**
* 回调工作模式
* @param $msg
*
*/
public function callbackTask($msg){
//手动完成状态
logs("回调工作模式1:". $msg->body,‘pushMessage‘);
$msg->delivery_info[‘channel‘]->basic_ack($msg->delivery_info[‘delivery_tag‘]);
sleep(0.02);
}
/**
* 回调工作模式
* @param $msg
*
*/
public function callbackTaskTwo($msg){
//手动完成状态
logs("回调工作模式2:". $msg->body,‘pushMessage‘);
$msg->delivery_info[‘channel‘]->basic_ack($msg->delivery_info[‘delivery_tag‘]);
sleep(1);
}
执行结果 以上采用的是能者多劳模式

tp5.1 rabbitmq 实现工作模式队列

 

tp5.1 rabbitmq 实现工作模式队列

上一篇:LeetCode 470.用Rand7()实现Rand10()


下一篇:SSM整合