RocketMQ是一款開源的分散式訊息系統,基於高可用分散式叢集技術,提供低延時的、高可靠、萬億級容量、靈活可伸縮的訊息釋出與訂閱服務。
它前身是MetaQ,是阿里基於Kafka的設計使用Java進行自主研發的。在2012年,阿里將其開源, 在2016年,阿里將其捐獻給Apache軟體基金會(Apache Software Foundation,簡稱為ASF),正式成為孵化專案。2017 年,Apache軟體基金會宣佈RocketMQ已孵化成為 Apache頂級專案(Top Level Project,簡稱為TLP ),是國內首個網際網路中介軟體在 Apache上的頂級專案。
延遲訊息
生產者把訊息傳送到訊息佇列中以後,並不期望被立即消費,而是等待指定時間後才可以被消費者消費,這類訊息通常被稱為延遲訊息。
在RocketMQ中,支援延遲訊息,但是不支援任意時間精度的延遲訊息,只支援特定級別的延遲訊息。如果要支援任意時間精度,不能避免在Broker層面做訊息排序,再涉及到持久化的考量,那麼訊息排序就不可避免產生巨大的效能開銷。
訊息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。在傳送訊息時,設定訊息延遲級別即可,設定訊息延遲級別時有以下3種情況:
延遲訊息示例
首先,寫一個消費者,用於消費延遲訊息:
再寫一個延遲訊息的生產者,用於傳送延遲訊息:
執行生產者以後,就會發送一條延遲訊息:
10秒鐘後,消費者收到的這條延遲訊息:
延遲訊息的原理分析
以下分析的RocketMQ原始碼的版本號是4.7.1,版本不同原始碼略有差別。
CommitLog
在org.apache.rocketmq.store.CommitLog中,針對延遲訊息做了一些處理:
可以看到,每一個延遲訊息的主題都被暫時更改為SCHEDULE_TOPIC_XXXX,並且根據延遲級別延遲訊息變更了新的佇列Id。接下來,處理延遲訊息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。
ScheduleMessageService
ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore進行初始化的,初始化包括構造物件和呼叫load方法。最後,再執行ScheduleMessageService的start方法:
遍歷所有延遲級別,根據延遲級別獲得對應佇列的偏移量,如果偏移量不存在,則設定為0。然後為每個延遲級別建立定時任務,第一次啟動任務延遲為1秒,第二次及以後的啟動任務延遲才是延遲級別相應的延遲時間。
然後,又建立了一個定時任務,用於持久化每個佇列消費的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進行配置,預設為10秒。
定時任務
ScheduleMessageService的start方法執行之後,每個延遲級別都建立自己的定時任務,這裡的定時任務的具體實現就在DeliverDelayedMessageTimerTask類之中,它核心程式碼是executeOnTimeup方法之中,我們來看一下主要部分:
如果沒有獲取到對應的訊息佇列,則在DELAY_FOR_A_WHILE(預設為100)毫秒後再執行任務。如果獲取到了,就繼續執行下面操作:
如果沒有獲取到有效訊息,則在DELAY_FOR_A_WHILE(預設為100)毫秒後再執行任務。如果獲取到了,就繼續執行下面操作:
如果當前訊息不到消費的時間,則在countdown毫秒後再執行任務。如果到消費的時間,就繼續執行下面操作:
如果獲取到訊息,則繼續執行下面操作:
清除了訊息的延遲級別,並且恢復了真正的訊息主題和佇列Id,重新把訊息傳送到真正的訊息佇列上以後,消費者就可以立即消費了。
總結
經過以上對原始碼的分析,可以總結出延遲訊息的實現步驟:
RocketMQ是一款開源的分散式訊息系統,基於高可用分散式叢集技術,提供低延時的、高可靠、萬億級容量、靈活可伸縮的訊息釋出與訂閱服務。
它前身是MetaQ,是阿里基於Kafka的設計使用Java進行自主研發的。在2012年,阿里將其開源, 在2016年,阿里將其捐獻給Apache軟體基金會(Apache Software Foundation,簡稱為ASF),正式成為孵化專案。2017 年,Apache軟體基金會宣佈RocketMQ已孵化成為 Apache頂級專案(Top Level Project,簡稱為TLP ),是國內首個網際網路中介軟體在 Apache上的頂級專案。
延遲訊息
生產者把訊息傳送到訊息佇列中以後,並不期望被立即消費,而是等待指定時間後才可以被消費者消費,這類訊息通常被稱為延遲訊息。
在RocketMQ中,支援延遲訊息,但是不支援任意時間精度的延遲訊息,只支援特定級別的延遲訊息。如果要支援任意時間精度,不能避免在Broker層面做訊息排序,再涉及到持久化的考量,那麼訊息排序就不可避免產生巨大的效能開銷。
訊息延遲級別分別為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。在傳送訊息時,設定訊息延遲級別即可,設定訊息延遲級別時有以下3種情況:
設定訊息延遲級別等於0時,則該訊息為非延遲訊息。設定訊息延遲級別大於等於1並且小於等於18時,訊息延遲特定時間,如:設定訊息延遲級別等於1,則延遲1s;設定訊息延遲級別等於2,則延遲5s,以此類推。設定訊息延遲級別大於18時,則該訊息延遲級別為18,如:設定訊息延遲級別等於20,則延遲2h。延遲訊息示例
首先,寫一個消費者,用於消費延遲訊息:
再寫一個延遲訊息的生產者,用於傳送延遲訊息:
執行生產者以後,就會發送一條延遲訊息:
10秒鐘後,消費者收到的這條延遲訊息:
延遲訊息的原理分析
以下分析的RocketMQ原始碼的版本號是4.7.1,版本不同原始碼略有差別。
CommitLog
在org.apache.rocketmq.store.CommitLog中,針對延遲訊息做了一些處理:
可以看到,每一個延遲訊息的主題都被暫時更改為SCHEDULE_TOPIC_XXXX,並且根據延遲級別延遲訊息變更了新的佇列Id。接下來,處理延遲訊息的就是org.apache.rocketmq.store.schedule.ScheduleMessageService。
ScheduleMessageService
ScheduleMessageService是由org.apache.rocketmq.store.DefaultMessageStore進行初始化的,初始化包括構造物件和呼叫load方法。最後,再執行ScheduleMessageService的start方法:
遍歷所有延遲級別,根據延遲級別獲得對應佇列的偏移量,如果偏移量不存在,則設定為0。然後為每個延遲級別建立定時任務,第一次啟動任務延遲為1秒,第二次及以後的啟動任務延遲才是延遲級別相應的延遲時間。
然後,又建立了一個定時任務,用於持久化每個佇列消費的偏移量。持久化的頻率由flushDelayOffsetInterval屬性進行配置,預設為10秒。
定時任務
ScheduleMessageService的start方法執行之後,每個延遲級別都建立自己的定時任務,這裡的定時任務的具體實現就在DeliverDelayedMessageTimerTask類之中,它核心程式碼是executeOnTimeup方法之中,我們來看一下主要部分:
如果沒有獲取到對應的訊息佇列,則在DELAY_FOR_A_WHILE(預設為100)毫秒後再執行任務。如果獲取到了,就繼續執行下面操作:
如果沒有獲取到有效訊息,則在DELAY_FOR_A_WHILE(預設為100)毫秒後再執行任務。如果獲取到了,就繼續執行下面操作:
如果當前訊息不到消費的時間,則在countdown毫秒後再執行任務。如果到消費的時間,就繼續執行下面操作:
如果獲取到訊息,則繼續執行下面操作:
清除了訊息的延遲級別,並且恢復了真正的訊息主題和佇列Id,重新把訊息傳送到真正的訊息佇列上以後,消費者就可以立即消費了。
總結
經過以上對原始碼的分析,可以總結出延遲訊息的實現步驟:
如果訊息的延遲級別大於0,則表示該訊息為延遲訊息,修改該訊息的主題為SCHEDULE_TOPIC_XXXX,佇列Id為延遲級別減1。訊息進入SCHEDULE_TOPIC_XXXX的佇列中。定時任務根據上次拉取的偏移量不斷從佇列中取出所有訊息。根據訊息的物理偏移量和大小再次獲取訊息。根據訊息屬性重新建立訊息,清除延遲級別,恢復原主題和佇列Id。重新發送訊息到原主題的佇列中,供消費者進行消費。