作者 | 小猩
一、簡述
延時訊息在日常隨處可見:
1、訂單建立10min之後不發起支付,自動取消。
2、30min定時推送一次郵件資訊。
最常用到方式為定時任務輪訓,資料量小的時候使用沒什麼問題 而當有千萬甚至上億的資料量時就會出現資料讀取的瓶頸,此時全表掃面進行處理一定是下下策。但是也有比較討巧的方式,分享公司內部訂單拆分的例子:
由於線上每天訂單量50萬+的增長量,單表早已無法吃撐這個增長的速度。採取的方式為訂單歸檔:線上熱資料保留2-3天的資料,其餘都歸檔進入歷史訂單表中,這樣熱資料在200萬以內。訂單超過10min不支付即取消的功能,可以採取簡單的掃表形式而不會出現資料讀取效能的問題。
這樣的方式很簡單,但需要跟業務進行溝通妥協,本文會講另一種方式即RabbitMQ延遲佇列。RabbitMQ實際並沒有直接實現延時佇列,但可利用RabbitMQ提供的屬性來模擬延時佇列,甚至已經有的配套的外掛rabbitmq_delayed_message_exchange 下面先介紹使用到的RabbitMQ的屬性。
1、訊息的Time To Live (TTL)
x-message-ttl:訊息過期時間,超過過期時間之後即變為死信(Dead-letter)不會再被消費者消費。
設定訊息TTL有兩種方式:
建立佇列時指定x-message-ttl,此時佇列所有的訊息具有統一過期時間。傳送訊息為每個訊息設定 expiration,此時訊息之間過期時間不同。如果兩者都設定,過期時間取兩者最小。如果設定TTL為0即表示除非立馬能傳送到佇列,否則直接丟棄該訊息。利用TTL為0的特性再結合死信轉發器可以替代RabbitMQ 3.0的immediate引數。
2、佇列的TTL
佇列無任何消費者佇列沒有被重新宣告佇列在過期未呼叫Basic.Get命令獲取訊息3、x-dead-letter-exchange(RabbitMQ文件):死信轉發器(轉發器型別)當訊息達到過期時間未被消費則會由該exchange按照配置的x-dead-letter-routing-key轉發到指定佇列,最後被消費者消費,如果未配置x-dead-letter-routing-key則會按照原佇列的key進行轉發。
4、佇列的訊息在以下幾種情況會變成死信(Dead-letter)
設定的x-message-ttl或者expiration到期,即訊息過期訊息被消費者拒絕(呼叫Basic.Reject / Basic.Nack)且 requeue引數設定為false佇列達到最大長度二、示例demo
單個延遲佇列RabbitMQ延時佇列邏輯:
3、queue_delay_begin:緩衝訊息佇列,等待訊息過期。
4、queue_delay_done:死信訊息佇列,消費者能夠真正消費資訊。
spring-rabbitmq.xml :
DelayMessageProducer.java
@Servicepublic class DelayMessageProducer { @Resource(name="delayMsgTwoTemplate") private AmqpTemplate delayMsgTwoTemplate; public void delayMsgTwo(String exchange, String routingKey, Object msg) { delayMsgTwoTemplate.convertAndSend(exchange, routingKey, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(10000)); return message; } }); }}
MessageConsumer.java
public class MessageConsumer implements MessageListener { @Override public void onMessage(Message message) { System.out.println("consumer receive message 22------->:{}"+ message); }}
application.xml
DelayQueueTest.java
public class DelayQueueTest { private ApplicationContext context = null; @org.junit.Before public void setUp() throws Exception { context = new ClassPathXmlApplicationContext("rabbitmq/application.xml"); } @Test public void delayQueueTest() throws Exception { DelayMessageProducer messageProducer = context.getBean(DelayMessageProducer.class); int a = 10; while (a > 0) { System.out.println("send "+ a); messageProducer.delayMsgTwo("exchange_delay_begin","delay", "hello world delay2 :" + a--); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("sended "); Thread.sleep(1000*60); }}
執行結果:傳送訊息 10s之後, 消費監聽到訊息 消費。
send 10send 9send 8send 7send 6send 5send 4send 3send 2send 1sended consumer receive message 22------->:{}(Body:'hello world delay2 :10' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :9' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :8' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=3, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :7' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=4, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :6' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=5, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :5' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=6, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :4' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=7, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :3' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=8, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :2' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=9, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])consumer receive message 22------->:{}(Body:'hello world delay2 :1' MessageProperties [headers={x-first-death-exchange=exchange_delay_begin, x-death=[{reason=expired, original-expiration=10000, count=1, exchange=exchange_delay_begin, time=Sun Jul 01 18:33:19 CST 2018, routing-keys=[delay], queue=queue_delay_begin}], x-first-death-reason=expired, x-first-death-queue=queue_delay_begin}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=exchange_delay_done, receivedRoutingKey=delay, receivedDelay=1000, deliveryTag=10, messageCount=0, consumerTag=amq.ctag-oOJzeYHEJYrLFL4HbHWZcA, consumerQueue=queue_delay_done])
多個延遲佇列實際的業務需求中會出現不同的時間延遲,此時可設定多個佇列以達到不同的延遲效果。例如5個佇列 common-queue_5s、common-queue_15s、common-queue_30s、common-queue_45s、common-queue_50s達到不同的延遲效果,整體的結構如下:
這裡貼出部分xml部分配置:
junit測試:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = {"classpath:application-context.xml"}) public class TestDeadLetter { @Autowired private DeadLetterPublishService publishService; @Test public void testALL() throws InterruptedException{ String message = "currentTime:" + System.currentTimeMillis(); System.out.println("test1---message: "+ message); publishService.send("common-exchange","common-queue_5s", message); publishService.send("common-exchange","common-queue_15s", message); publishService.send("common-exchange","common-queue_30s", message); publishService.send("common-exchange","common-queue_45s", message); publishService.send("common-exchange","common-queue_50s", message); Thread.sleep(100000); }} TestDeadLetter.java
最後執行結果:訊息實際傳送時間點 和 訊息被延遲消費時間點無限接近 五個訊息分別延遲大約 5s 15s 30s 45s 50s 但做不到精確一致。
test1---message: currentTime:1566920053524// 。。。。1566920058551 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:18 CST 2019, routing-keys=[common-queue_5s], queue=common-queue_5s}], x-first-death-reason=expired, x-first-death-queue=common-queue_5s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_5s, deliveryTag=1, messageCount=0]:currentTime:15669200535241566920068578 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:28 CST 2019, routing-keys=[common-queue_15s], queue=common-queue_15s}], x-first-death-reason=expired, x-first-death-queue=common-queue_15s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_15s, deliveryTag=1, messageCount=0]:currentTime:15669200535241566920083550 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:43 CST 2019, routing-keys=[common-queue_30s], queue=common-queue_30s}], x-first-death-reason=expired, x-first-death-queue=common-queue_30s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_30s, deliveryTag=1, messageCount=0]:currentTime:15669200535241566920098549 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:34:58 CST 2019, routing-keys=[common-queue_45s], queue=common-queue_45s}], x-first-death-reason=expired, x-first-death-queue=common-queue_45s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_45s, deliveryTag=1, messageCount=0]:currentTime:15669200535241566920103551 - consumer--:MessageProperties [headers={spring_return_correlation=1ac8ee8e-6d61-4bbf-bac1-f21523a0d759, x-first-death-exchange=common-exchange, x-death=[{reason=expired, count=1, exchange=common-exchange, time=Tue Aug 27 23:35:03 CST 2019, routing-keys=[common-queue_50s], queue=common-queue_50s}], x-first-death-reason=expired, x-first-death-queue=common-queue_50s}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=dlx-exchange, receivedRoutingKey=dead-letter-queue_50s, deliveryTag=1, messageCount=0]:currentTime:1566920053524