在PHP中如何使用消息列队

/**
* 消息列队服务
* @author zhou.tingze
* @example
* -----------------------------------Create----------------------------------------
* $array = array('a','b','c','d');
* $this->load->library('amqp_service');
* $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');
* $this->amqp_service->createMessageQueue($array);
* -----------------------------------End-------------------------------------------
*
* -----------------------------------Get-------------------------------------------
* $this->load->library('amqp_service');
* $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');
* $message_queue = $this->amqp_service->getMessageQueue();
* var_dump($message_queue)
* -----------------------------------End-------------------------------------------
*/ class Amqp_service extends Base_service{ public $conn;
public $exchange;
public $queue;
public $router; function __construct(){
parent:: __construct(); //获取系统配置
$this->load->config('app_config', TRUE);
$app_config = $this->config->item('app_config'); $this->connect($app_config['amqp']);
} /**
*
* 尝试连接Amqp服务
*/
private function connect($amqp_args)
{
$this->conn = new AMQPConnection($amqp_args);
$this->conn->connect(); if (!$this->conn->isConnected())
{
throw new Exception('Cannot connect to the broker.');
}
} /**
*
* 设定消息列队保存方式
* @param String $exchange_name 交换机名
* @param String $queue_name 消息列队名
* @param String $router_name 路由名
*/
public function setSaveType($exchange_name, $queue_name, $router_name)
{
$this->exchange = $exchange_name;
$this->queue = $queue_name;
$this->router = $router_name;
} /**
*
* 创建消息列队
* @param Array $array
*/
public function createMessageQueue($array)
{
//创建交换机
$channel = new AMQPChannel($this->conn);
$ex = new AMQPExchange($channel); //交换机名
$ex->setName($this->exchange);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$ex->declare(); //创建消息列队
$q = new AMQPQueue($channel); //队列名
$q->setName($this->queue);
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$q->declare(); //绑定交换机与队列,并指定路由键
$q->bind($this->exchange, $this->router); //消息发布
$channel->startTransaction();
$message = json_encode($array);
$ex->publish($message, $this->router);
$channel->commitTransaction(); //$this->conn->disconnect();
} /**
*
* 获取消息列队
*/
public function getMessageQueue()
{
try
{
//设置queue名称,使用exchange,绑定routingkey
$channel = new AMQPChannel($this->conn);
$q = new AMQPQueue($channel); $q->setName($this->queue);
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$q->declare();
$q->bind($this->exchange, $this->router); //消息获取
$messages = $q->get(AMQP_AUTOACK) ; $arr = array();
if ($messages){
$arr = json_decode($messages->getBody(), true );
}
}catch (Exception $e){
throw new Exception($e->getMessage());
}
//$this->conn->disconnect(); return $arr;
} /*
public function getAllMessageQueue()
{
//设置queue名称,使用exchange,绑定routingkey
$channel = new AMQPChannel($this->conn);
$q = new AMQPQueue($channel); $q->setName($this->queue);
$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);
$q->declare();
$q->bind($this->exchange, $this->router);
$this->conn->disconnect(); //阻塞模式获取消息列队
while(True){
$q->consume('processMessage');
//$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
}
*/ public function __destruct()
{
$this->conn->disconnect();
}
} /**
* 消费回调函数
* 处理消息
* @param Object $envelope
* @param Object $queue
*/
/*
function processMessage($envelope, $queue) {
$msg = $envelope->getBody();
echo $msg . '<br />'; //手动发送ACK应答
$queue->ack($envelope->getDeliveryTag());
}
*/

  

上一篇:Lambda类库篇 —— Streams API, Collector和并行


下一篇:事后分析报告(Postmortem Report)