架構設計Producer
訊息釋出的角色,支援分散式叢集方式部署。Producer透過MQ的負載均衡模組選擇相應的Broker叢集佇列進行訊息投遞,投遞的過程支援快速失敗並且低延遲。
Consumer訊息消費的角色,支援分散式叢集方式部署。支援以push推,pull拉兩種模式對訊息進行消費。同時也支援叢集方式和廣播方式的消費,它提供實時訊息訂閱機制,可以滿足大多數使用者的需求。
NameServerNameServer是一個非常簡單的Topic路由註冊中心,其角色類似Dubbo中的zookeeper,支援Broker的動態註冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker叢集的註冊資訊並且儲存下來作為路由資訊的基本資料。然後提供心跳檢測機制,檢查Broker是否還存活;路由資訊管理,每個NameServer將儲存關於Broker叢集的整個路由資訊和用於客戶端查詢的佇列資訊。然後Producer和Conumser透過NameServer就可以知道整個Broker叢集的路由資訊,從而進行訊息的投遞和消費。NameServer通常也是叢集的方式部署,各例項間相互不進行資訊通訊。Broker是向每一臺NameServer註冊自己的路由資訊,所以每一個NameServer例項上面都儲存一份完整的路由資訊。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由資訊,Producer,Consumer仍然可以動態感知Broker的路由的資訊。
BrokerServerBroker主要負責訊息的儲存、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模組。
Remoting Module:整個Broker的實體,負責處理來自clients端的請求。Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱資訊Store Service:提供方便簡單的API介面處理訊息儲存到物理硬碟和查詢功能。HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的資料同步功能。Index Service:根據特定的Message key對投遞到Broker的訊息進行索引服務,以提供訊息的快速查詢。部署架構工作流程啟動NameServer,NameServer起來後監聽埠,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。Broker啟動,跟所有的NameServer保持長連線,定時傳送心跳包。心跳包中包含當前Broker資訊(IP+埠等)以及儲存所有Topic資訊。註冊成功後,NameServer叢集中就有Topic跟Broker的對映關係。收發訊息前,先建立Topic,建立Topic時需要指定該Topic要儲存在哪些Broker上,也可以在傳送訊息時自動建立Topic。Producer傳送訊息,啟動時先跟NameServer叢集中的其中一臺建立長連線,並從NameServer中獲取當前傳送的Topic存在哪些Broker上,輪詢從佇列列表中選擇一個佇列,然後與佇列所在的Broker建立長連線從而向Broker發訊息。Consumer跟Producer類似,跟其中一臺NameServer建立長連線,獲取當前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連線通道,開始消費訊息。設計原理主要包括訊息儲存、通訊機制、訊息過濾、負載均衡、事務訊息等
訊息儲存訊息儲存架構圖中主要有下面三個跟訊息儲存相關的檔案構成。
CommitLog訊息主體以及元資料的儲存主體,儲存Producer端寫入的訊息主體內容,訊息內容不是定長的。單個檔案大小預設1G ,檔名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個檔案,起始偏移量為0,檔案大小為1G=1073741824;當第一個檔案寫滿了,第二個檔案為00000000001073741824,起始偏移量為1073741824,以此類推。訊息主要是順序寫入日誌檔案,當檔案滿了,寫入下一個檔案;
ConsumeQueue訊息消費佇列,引入的目的主要是提高訊息消費的效能,由於RocketMQ是基於主題topic的訂閱模式,訊息消費是針對主題進行的,如果要遍歷commitlog檔案中根據topic檢索訊息是非常低效的。Consumer即可根據ConsumeQueue來查詢待消費的訊息。其中,ConsumeQueue(邏輯消費佇列)作為消費訊息的索引,儲存了指定Topic下的佇列訊息在CommitLog中的起始物理偏移量offset,訊息大小size和訊息Tag的HashCode值。consumequeue檔案可以看成是基於topic的commitlog索引檔案,故consumequeue資料夾的組織方式如下:topic/queue/file三層組織結構,具體儲存路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue檔案採取定長設計,每一個條目共20個位元組,分別為8位元組的commitlog物理偏移量、4位元組的訊息長度、8位元組tag hashcode,單個檔案由30W個條目組成,可以像陣列一樣隨機訪問每一個條目,每個ConsumeQueue檔案大小約5.72M;
IndexFileIndexFile(索引檔案)提供了一種可以透過key或時間區間來查詢訊息的方法。Index檔案的儲存位置是: $HOME \store\index${fileName},檔名fileName是以建立時的時間戳命名的,固定的單個IndexFile檔案大小約為400M,一個IndexFile可以儲存 2000W個索引,IndexFile的底層儲存設計為在檔案系統中實現HashMap結構,故rocketmq的索引檔案其底層實現為hash索引。
在上面的RocketMQ的訊息儲存整體架構圖中可以看出,RocketMQ採用的是混合型的儲存結構,即為Broker單個例項下所有的佇列共用一個日誌資料檔案(即為CommitLog)來儲存。RocketMQ的混合型儲存結構(多個Topic的訊息實體內容都儲存於一個CommitLog中)針對Producer和Consumer分別採用了資料和索引部分相分離的儲存結構,Producer傳送訊息至Broker端,然後Broker端使用同步或者非同步的方式對訊息刷盤持久化,儲存至CommitLog中。只要訊息被刷盤持久化至磁碟檔案CommitLog中,那麼Producer傳送的訊息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條訊息。當無法拉取到訊息後,可以等下一次訊息拉取,同時服務端也支援長輪詢模式,如果一個訊息拉取請求未拉取到訊息,Broker允許等待30s的時間,只要這段時間內有新訊息到達,將直接返回給消費端。這裡,RocketMQ的具體做法是,使用Broker端的後臺服務執行緒—ReputMessageService不停地分發請求並非同步構建ConsumeQueue(邏輯消費佇列)和IndexFile(索引檔案)資料。
訊息讀取頁快取頁快取(PageCache)是OS對檔案的快取,用於加速對檔案的讀寫。一般來說,程式對檔案進行順序讀寫的速度幾乎接近於記憶體的讀寫速度,主要原因就是由於OS使用PageCache機制對讀寫訪問操作進行了效能最佳化,將一部分的記憶體用作PageCache。對於資料的寫入,OS會先寫入至Cache內,隨後透過非同步的方式由pdflush核心執行緒將Cache內的資料刷盤至物理磁碟上。對於資料的讀取,如果一次讀取檔案時出現未命中PageCache的情況,OS從物理磁碟上訪問讀取檔案的同時,會順序對其他相鄰塊的資料檔案進行預讀取。
在RocketMQ中,ConsumeQueue邏輯消費佇列儲存的資料較少,並且是順序讀取,在page cache機制的預讀取作用下,Consume Queue檔案的讀效能幾乎接近讀記憶體,即使在有訊息堆積情況下也不會影響效能。而對於CommitLog訊息儲存的日誌資料檔案來說,讀取訊息內容時候會產生較多的隨機訪問讀取,嚴重影響效能。如果選擇合適的系統IO排程演算法,比如設定排程演算法為“Deadline”(此時塊儲存採用SSD的話),隨機讀的效能也會有所提升。
記憶體對映RocketMQ主要透過MappedByteBuffer對檔案進行讀寫操作。其中,利用了NIO中的FileChannel模型將磁碟上的物理檔案直接對映到使用者態的記憶體地址中(這種Mmap的方式減少了傳統IO將磁碟檔案資料在作業系統核心地址空間的緩衝區和使用者應用程式地址空間的緩衝區之間來回進行複製的效能開銷),將對檔案的操作轉化為直接對記憶體地址進行操作,從而極大地提高了檔案的讀寫效率(正因為需要使用記憶體對映機制,故RocketMQ的檔案儲存都使用定長結構來儲存,方便一次將整個檔案對映至記憶體)。
訊息刷盤同步刷盤如上圖所示,只有在訊息真正持久化至磁碟後RocketMQ的Broker端才會真正返回給Producer端一個成功的ACK響應。同步刷盤對MQ訊息可靠性來說是一種不錯的保障,但是效能上會有較大影響,一般適用於金融業務應用該模式較多。
非同步刷盤能夠充分利用OS的PageCache的優勢,只要訊息寫入PageCache即可將成功的ACK返回給Producer端。訊息刷盤採用後臺非同步執行緒提交的方式進行,降低了讀寫延遲,提高了MQ的效能和吞吐量。
執行緒模型Reactor多執行緒設計RocketMQ的RPC通訊採用Netty元件作為底層通訊庫,同樣也遵循了Reactor多執行緒模型,同時又在這之上做了一些擴充套件和最佳化。
訊息過濾Tag過濾方式Consumer端在訂閱訊息時除了指定Topic還可以指定TAG,如果一個訊息有多個TAG,可以用||分隔。其中,Consumer端會將這個訂閱請求構建成一個 SubscriptionData,傳送一個Pull訊息的請求給Broker端。Broker端從RocketMQ的檔案儲存層—Store讀取資料之前,會用這些資料先構建一個MessageFilter,然後傳給Store。Store從 ConsumeQueue讀取到一條記錄後,會用它記錄的訊息tag hash值去做過濾,由於在服務端只是根據hashcode進行判斷,無法精確對tag原始字串進行過濾,故在訊息消費端拉取到訊息後,還需要對訊息的原始tag字串進行比對,如果不同,則丟棄該訊息,不進行訊息消費。
SQL92過濾方式這種方式的大致做法和上面的Tag過濾方式一樣,只是在Store層的具體過濾過程不太一樣,真正的 SQL expression 的構建和執行由rocketmq-filter模組負責的。每次過濾都去執行SQL表示式會影響效率,所以RocketMQ使用了BloomFilter避免了每次都去執行。SQL92的表示式上下文為訊息的屬性。
負載均衡RocketMQ中的負載均衡都在Client端完成,具體來說的話,主要可以分為Producer端傳送訊息時候的負載均衡和Consumer端訂閱訊息的負載均衡。
Producer的負載均衡Producer端在傳送訊息的時候,會先根據Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由資訊後,RocketMQ的客戶端在預設方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個佇列(MessageQueue)進行傳送訊息。選擇策略如下:
輪詢最小延遲Consumer的負載均衡在Consumer啟動後,它就會透過定時任務不斷地向RocketMQ叢集中的所有Broker例項傳送心跳包(其中包含了,訊息消費分組名稱、訂閱關係集合、訊息通訊模式和客戶端id的值等資訊)。
Broker端在收到Consumer的心跳訊息後,會將它維護在ConsumerManager的本地快取變數—consumerTable,同時並將封裝後的客戶端網路通道資訊儲存在本地快取變數—channelInfoTable中,為之後做Consumer端的負載均衡提供可以依據的元資料資訊。
對Topic下的訊息消費佇列、消費者Id排序,然後用訊息佇列分配策略演算法(預設為:訊息佇列的平均分配演算法),計算出待拉取的訊息佇列。分配策略如下
平均分配策略(預設)(AllocateMessageQueueAveragely)環形分配策略(AllocateMessageQueueAveragelyByCircle)手動配置分配策略(AllocateMessageQueueByConfig)機房分配策略(AllocateMessageQueueByMachineRoom)一致性雜湊分配策略(AllocateMessageQueueConsistentHash)靠近機房策略(AllocateMachineRoomNearby)訊息消費佇列在同一消費組不同消費者之間的負載均衡,其核心設計理念是在一個訊息消費佇列在同一時間只允許被同一消費組內的一個消費者消費,一個訊息消費者能同時消費多個訊息佇列。
事務訊息RocketMQ採用了2PC的思想來實現了提交事務訊息,同時增加一個補償邏輯來處理二階段超時或者失敗的訊息,如下圖所示。
上圖說明了事務訊息的大致方案,其中分為兩個流程:正常事務訊息的傳送及提交、事務訊息的補償流程。
1.事務訊息傳送及提交:
(1) 傳送訊息(half訊息)。
(2) 服務端響應訊息寫入結果。
(3) 根據傳送結果執行本地事務(如果寫入失敗,此時half訊息對業務不可見,本地邏輯不執行)。
(4) 根據本地事務狀態執行Commit或者Rollback(Commit操作生成訊息索引,訊息對消費者可見)
2.補償流程:
(1) 對沒有Commit/Rollback的事務訊息(pending狀態的訊息),從服務端發起一次“回查”
(2) Producer收到回查訊息,檢查回查訊息對應的本地事務的狀態
(3) 根據本地事務狀態,重新Commit或者Rollback
其中,補償階段用於解決訊息Commit或者Rollback發生超時或者失敗的情況。
訊息查詢按照MessageId查詢訊息在RocketMQ中具體做法是:Client端從MessageId中解析出Broker的地址(IP地址和埠)和Commit Log的偏移地址後封裝成一個RPC請求後透過Remoting通訊層傳送(業務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取訊息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄並解析成一個完整的訊息返回。
按照Message Key查詢訊息按照Message Key查詢訊息,主要是基於RocketMQ的IndexFile索引檔案來實現的。RocketMQ的索引檔案邏輯結構,類似JDK中HashMap的實現。索引檔案的具體結構如下:
RocketMQ的具體做法是,主要透過Broker端的QueryMessageProcessor業務處理器來查詢,讀取訊息的過程就是用topic和key找到IndexFile索引檔案中的一條記錄,根據其中的commitLog offset從CommitLog檔案中讀取訊息的實體內容。