基于RabbitMQ實現(xiàn)延遲隊列--PHP版
延遲任務應用場景
場景一:物聯(lián)網(wǎng)系統(tǒng)經(jīng)常會遇到向終端下發(fā)命令,如果命令一段時間沒有應答,就需要設置成超時。
場景二:訂單下單之后30分鐘后,如果用戶沒有付錢,則系統(tǒng)自動取消訂單。
場景三:過1分鐘給新注冊會員的用戶,發(fā)送注冊郵件等。
實現(xiàn)方案
定時任務輪詢數(shù)據(jù)庫,看是否有產(chǎn)生新任務,如果產(chǎn)生則消費任務
pcntl_alarm為進程設置一個鬧鐘信號
swoole的異步高精度定時器:swoole_time_tick(類似javascript的setInterval)和swoole_time_after(相當于javascript的setTimeout)
rabbitmq延遲任務
以上四種方案,如果生產(chǎn)環(huán)境有使用到swoole建議使用第三種方案。此篇文章重點講述第四種方案實現(xiàn)。
?
RabbitMQ延遲隊列實現(xiàn)的方式有兩種:
通過消息過期后進入死信交換器,再由交換器轉發(fā)到延遲消費隊列,實現(xiàn)延遲功能;
使用rabbitmq-delayed-message-exchange插件實現(xiàn)延遲功能;
注意:?延遲插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依賴Erlang/OPT 18.0及以上運行環(huán)境。

Redis應用-異步消息隊列與延時隊列
1.RabbitMQ死信機制實現(xiàn)延遲隊列
RabbitMQ沒有直接去實現(xiàn)延遲隊列這個功能。而是需要通過消息的TTL和死信Exchange這兩者的組合來實現(xiàn)。
消息的TTL(Time To Live)
消息的TTL就是消息的存活時間。RabbitMQ可以對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連著的保留時間,也可以對每一個單獨的消息做單獨的設置。超過了這個時間,我們認為這個消息就死了,稱之為死信。如果隊列設置了,消息也設置了,那么會取小的。所以一個消息如果被路由到不同的隊列中,這個消息死亡的時間有可能不一樣(不同的隊列設置)。這里單講單個消息的TTL,因為它才是實現(xiàn)延遲任務的關鍵。
可以通過設置消息的expiration字段或者隊列x-message-ttl屬性來設置時間,兩者是一樣的效果。下面例子是通過隊列的ttl實現(xiàn)死信。
$queue?=?new?AMQPQueue($channel);
$queue->setName($params['queueName']?:'');
$queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
????????'x-dead-letter-exchange'?=>?'delay_exchange',
????????'x-dead-letter-routing-key'?=>?'delay_route',
????????'x-message-ttl'?=>?60000,
));
$queue->declareQueue();
當上面的消息扔到該隊列中后,過了60秒,如果沒有被消費,它就死了。不會被消費者消費到。這個消息后面的,沒有“死掉”的消息對頂上來,被消費者消費。死信在隊列中并不會被刪除和釋放,它會被統(tǒng)計到隊列的消息數(shù)中去。單靠死信還不能實現(xiàn)延遲任務,還要靠Dead Letter Exchange。
Dead Letter Exchanges
Exchage的概念在這里就不在贅述,一個消息在滿足如下條件下,會進死信路由,記住這里是路由而不是隊列,一個路由可以對應很多隊列。
1. 一個消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會被再次放在隊列里,被其他消費者使用。
2. 上面的消息的TTL到了,消息過期了。
3. 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
Dead Letter Exchange其實就是一種普通的exchange,和創(chuàng)建其他exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過期了,會自動觸發(fā)消息的轉發(fā),發(fā)送到Dead Letter Exchange中去。
?

一文帶你了解Redis秒殺應用場景
示例
消費者 delay_consumer1.php:
//來源公眾號:【碼農(nóng)編程進階筆記】
//header('Content-Type:text/html;charset=utf8;');
$params?=?array(
????'exchangeName'?=>?'delay_exchange',
????'queueName'?=>?'delay_queue',
????'routeKey'?=>?'delay_route',
);
$connectConfig?=?array(
????'host'?=>?'localhost',
????'port'?=>?5672,
????'login'?=>?'guest',
????'password'?=>?'guest',
????'vhost'?=>?'/'
);
//var_dump(extension_loaded('amqp'));
try?{
????$conn?=?new?AMQPConnection($connectConfig);
????$conn->connect();
????if?(!$conn->isConnected())?{
????????//die('Conexiune?esuata');
????????//TODO?記錄日志
????????echo?'rabbit-mq?連接錯誤:',?json_encode($connectConfig);
????????exit();
????}
????$channel?=?new?AMQPChannel($conn);
????if?(!$channel->isConnected())?{
????????//?die('Connection?through?channel?failed');
????????//TODO?記錄日志
????????echo?'rabbit-mq?Connection?through?channel?failed:',?json_encode($connectConfig);
????????exit();
????}
????$exchange?=?new?AMQPExchange($channel);
????$exchange->setFlags(AMQP_DURABLE);//聲明一個已存在的交換器的,如果不存在將拋出異常,這個一般用在consume端
????$exchange->setName($params['exchangeName']?:'');
????$exchange->setType(AMQP_EX_TYPE_DIRECT);?//direct類型
????$exchange->declareExchange();
????//$channel->startTransaction();
????$queue?=?new?AMQPQueue($channel);
????$queue->setName($params['queueName']?:'');
????$queue->setFlags(AMQP_DURABLE);
????$queue->declareQueue();
????//綁定
????$queue->bind($params['exchangeName'],?$params['routeKey']);
}?catch(Exception?$e)?{
????echo?$e->getMessage();
????exit();
}
function?callback(AMQPEnvelope?$message)?{
????global?$queue;
????if?($message)?{
????????$body?=?$message->getBody();
????????echo?'接收時間:'.date("Y-m-d?H:i:s",?time()).?PHP_EOL;
????????echo?'接收內(nèi)容:'.$body?.?PHP_EOL;
????????//為了防止接收端在處理消息時down掉,只有在消息處理完成后才發(fā)送ack消息
????????$queue->ack($message->getDeliveryTag());
????}?else?{
????????echo?'no?message'?.?PHP_EOL;
????}
}
//$queue->consume('callback');??第一種消費方式,但是會阻塞,程序一直會卡在此處
//注意:這里需要注意的是這個方法:$queue->consume,queue對象有兩個方法可用于取消息:consume和get。前者是阻塞的,無消息時會被掛起,適合循環(huán)中使用;后者則是非阻塞的,取消息時有則取,無則返回false。
//就是說用了consume之后,會同步阻塞,該程序常駐內(nèi)存,不能用nginx,apache調(diào)用。?
$action?=?'2';
if($action?==?'1'){
????$queue->consume('callback');??//第一種消費方式,但是會阻塞,程序一直會卡在此處
}else{
????//第二種消費方式,非阻塞
????$start?=?time();
????while(true)
????{
????????$message?=?$queue->get();
????????if(!empty($message))
????????{
????????????echo?'接收時間:'.date("Y-m-d?H:i:s",?time()).?PHP_EOL;
????????????echo?'接收內(nèi)容:'.$message->getBody().PHP_EOL;
????????????$queue->ack($message->getDeliveryTag());????//應答,代表該消息已經(jīng)消費
????????????$end?=?time();
????????????echo?'運行時間:'.($end?-?$start).'秒'.PHP_EOL;
????????????//exit();
????????}
????????else
????????{
????????????//echo?'message?not?found'?.?PHP_EOL;
????????}
????}
}
生產(chǎn)者delay_publisher1.php:
//來源公眾號:【碼農(nóng)編程進階筆記】
//header('Content-Type:text/html;charset=utf-8;');
$params?=?array(
????'exchangeName'?=>?'test_cache_exchange',
????'queueName'?=>?'test_cache_queue',
????'routeKey'?=>?'test_cache_route',
);
$connectConfig?=?array(
????'host'?=>?'localhost',
????'port'?=>?5672,
????'login'?=>?'guest',
????'password'?=>?'guest',
????'vhost'?=>?'/'
);
//var_dump(extension_loaded('amqp'));?判斷是否加載amqp擴展
//exit();
for($i=5;$i>0;$i--){
????try?{
????????$conn?=?new?AMQPConnection($connectConfig);
????????$conn->connect();
????????if?(!$conn->isConnected())?{
????????????//die('Conexiune?esuata');
????????????//TODO?記錄日志
????????????echo?'rabbit-mq?連接錯誤:',?json_encode($connectConfig);
????????????exit();
????????}
????????$channel?=?new?AMQPChannel($conn);
????????if?(!$channel->isConnected())?{
????????????//?die('Connection?through?channel?failed');
????????????//TODO?記錄日志
????????????echo?'rabbit-mq?Connection?through?channel?failed:',?json_encode($connectConfig);
????????????exit();
????????}
????????$exchange?=?new?AMQPExchange($channel);
????????$exchange->setFlags(AMQP_DURABLE);//持久化
????????$exchange->setName($params['exchangeName']);
????????$exchange->setType(AMQP_EX_TYPE_DIRECT);?//direct類型
????????$exchange->declareExchange();
????????//$channel->startTransaction();
????????//RabbitMQ不容許聲明2個相同名稱、配置不同的Queue隊列
????????$queue?=?new?AMQPQueue($channel);
????????$queue->setName($params['queueName'].$i);
????????$queue->setFlags(AMQP_DURABLE);
????????$queue->setArguments(array(
????????????'x-dead-letter-exchange'?=>?'delay_exchange',???????////?死信交換機
????????????'x-dead-letter-routing-key'?=>?'delay_route',??????????//?死信路由
????????????'x-message-ttl'?=>?(10000*$i),???????//?當上面的消息扔到該隊列中后,過了60秒,如果沒有被消費,它就死了
????????????//?在RMQ中想要使用優(yōu)先級特性需要的版本為3.5+。
????????????//'x-max-priority'=>0,//將隊列聲明為優(yōu)先級隊列,即在創(chuàng)建隊列的時候添加參數(shù) x-max-priority 以指定最大的優(yōu)先級,值為0-255(整數(shù))。
????????));
????????$queue->declareQueue();
????????//綁定隊列和交換機
????????$queue->bind($params['exchangeName'],?$params['routeKey'].$i);
????????//$channel->commitTransaction();
????}?catch(Exception?$e)?{
????}
????//?當mandatory標志位設置為true時,如果exchange根據(jù)自身類型和消息routeKey無法找到一個符合條件的queue,那么會調(diào)用basic.return方法將消息返還給生產(chǎn)者;當mandatory設為false時,出現(xiàn)上述情形broker會直接將消息扔掉。
????//delivery_mode=2指明message為持久的
????//生成消息
????echo?'發(fā)送時間:'.date("Y-m-d?H:i:s",?time()).PHP_EOL;
????echo?'i='.$i.',延遲'.($i*10).'秒'.PHP_EOL;
????$message?=?json_encode(['order_id'=>time(),'i'=>$i]);
????$exchange->publish($message,?$params['routeKey'].$i,?AMQP_MANDATORY,?array('delivery_mode'=>2));
????$conn->disconnect();
????sleep(2);
}
使用方法:先運行delay_consumer1.php,再運行delay_publisher1.php
運行效果:

最關鍵的點就是在生產(chǎn)者那里
??$queue->setArguments(array(
????????????'x-dead-letter-exchange'?=>?'delay_exchange',???????//?死信交換機
????????????'x-dead-letter-routing-key'?=>?'delay_route',??????????//?死信路由
????????????'x-message-ttl'?=>?10000,???????//?當上面的消息扔到該隊列中后,過了60秒,如果沒有被消費,它就死了
????????));
詳細過程:
首先由正常隊列(test_cache_queue)和正常exchange(test_cache_exchange),兩者相綁定。
該正常隊列設置了死信路由(delay_exchange)和死信路由key以及TTL,生產(chǎn)者生產(chǎn)消息到正常隊列和正常路由上.
當正常隊列設置TTL時間一到,那延遲消息就會自動發(fā)布到死信路由
消費者通過死信路由(delay_exchange)和死信隊列(delay_queue)來消費?
注意:
使用死信隊列實現(xiàn)延時消息的缺點:
1) 如果統(tǒng)一用隊列來設置消息的TTL,當梯度非常多的情況下,比如1分鐘,2分鐘,5分鐘,10分鐘,20分鐘,30分鐘……需要創(chuàng)建很多交換機和隊列來路由消息。
2) 如果單獨設置消息的TTL,則可能會造成隊列中的消息阻塞——前一條消息沒有出隊(沒有被消費),后面的消息無法投遞。3) 可能存在一定的時間誤差。
4) ttl設置之后,下次修改時間,會報錯,這時候,需要先刪除該隊列,重啟項目。否則會報錯。
5) 消費者中,拋異常了沒處理,會一直重復消費
2020-02-02
2020-01-21
2020-01-17
2019-11-13

