-
1 # 碼農要信佛
-
2 # 雲原生玩碼部落
基於訊息的系統模型,不一定需要broker(訊息佇列服務端)。市面上的的Akka(actor模型)、ZeroMQ等,其實都是基於訊息的系統設計正規化,但是沒有broker。
我們之所以要設計一個訊息佇列,並且配備broker,無外乎要做兩件事情:
1、訊息的轉儲,在更合適的時間點投遞,或者透過一系列手段輔助訊息最終能送達消費機。
2、規範一種正規化和通用的模式,以滿足解耦、最終一致性、錯峰等需求。掰開了揉碎了看,最簡單的訊息佇列可以做成一個訊息轉發器,把一次RPC做成兩次RPC。傳送者把訊息投遞到服務端(以下簡稱broker),服務端再將訊息轉發一手到接收端,就是這麼簡單。
利用RPC將資料流串起來。然後考慮RPC的高可用性,儘量做到無狀態,方便水平擴充套件。之後考慮如何承載訊息堆積,然後在合適的時機投遞訊息,而處理堆積的最佳方式,就是儲存,儲存的選型需要綜合考慮效能/可靠性和開發維護成本等諸多因素。
為了實現廣播功能,我們必須要維護消費關係,可以利用zk/config server等儲存消費關係。
在完成了上述幾個功能後,訊息佇列基本就實現了。然後我們可以考慮一些高階特性,如可靠投遞,事務特性,效能最佳化等。
下面我們會以設計訊息佇列時重點考慮的模組為主線,穿插灌輸一些訊息佇列的特性實現方法,來具體分析設計實現一個訊息佇列時的方方面面。
實現佇列基本功能
RPC通訊協議
剛才講到,所謂訊息佇列,無外乎兩次RPC加一次轉儲,當然需要消費端最終做消費確認的情況是三次RPC。既然是RPC,就必然牽扯出一系列話題,什麼負載均衡啊、服務發現啊、通訊協議啊、序列化協議啊,等等。在這一塊,我的強烈建議是不要重複造輪子。利用公司現有的RPC框架:Thrift也好,Dubbo也好,或者是其他自定義的框架也好。因為訊息佇列的RPC,和普通的RPC沒有本質區別。當然了,自主利用Memchached或者Redis協議重新寫一套RPC框架並非不可(如MetaQ使用了自己封裝的Gecko NIO框架,卡夫卡也用了類似的協議)。但實現成本和難度無疑倍增。排除對效率的極端要求,都可以使用現成的RPC框架。
簡單來講,服務端提供兩個RPC服務,一個用來接收訊息,一個用來確認訊息收到。並且做到不管哪個server收到訊息和確認訊息,結果一致即可。當然這中間可能還涉及跨IDC的服務的問題。這裡和RPC的原則是一致的,儘量優先選擇本機房投遞。你可能會問,如果producer和consumer本身就在兩個機房了,怎麼辦?首先,broker必須保證感知的到所有consumer的存在。其次,producer儘量選擇就近的機房就好了。
高可用
其實所有的高可用,是依賴於RPC和儲存的高可用來做的。先來看RPC的高可用,美團的基於MTThrift的RPC框架,阿里的Dubbo等,其本身就具有服務自動發現,負載均衡等功能。而訊息佇列的高可用,只要保證broker接受訊息和確認訊息的介面是冪等的,並且consumer的幾臺機器處理訊息是冪等的,這樣就把訊息佇列的可用性,轉交給RPC框架來處理了。
那麼怎麼保證冪等呢?最簡單的方式莫過於共享儲存。broker多機器共享一個DB或者一個分散式檔案/kv系統,則處理訊息自然是冪等的。就算有單點故障,其他節點可以立刻頂上。另外failover可以依賴定時任務的補償,這是訊息佇列本身天然就可以支援的功能。儲存系統本身的可用性我們不需要操太多心,放心大膽的交給DBA們吧!
對於不共享儲存的佇列,如Kafka使用分割槽加主備模式,就略微麻煩一些。需要保證每一個分割槽內的高可用性,也就是每一個分割槽至少要有一個主備且需要做資料的同步,關於這塊HA的細節,可以參考下篇pull模型訊息系統設計。
服務端承載訊息堆積的能力
訊息到達服務端如果不經過任何處理就到接收者了,broker就失去了它的意義。為了滿足我們錯峰/流控/最終可達等一系列需求,把訊息儲存下來,然後選擇時機投遞就顯得是順理成章的了。
只是這個儲存可以做成很多方式。比如儲存在記憶體裡,儲存在分散式KV裡,儲存在磁盤裡,儲存在資料庫裡等等。但歸結起來,主要有持久化和非持久化兩種。
持久化的形式能更大程度地保證訊息的可靠性(如斷電等不可抗外力),並且理論上能承載更大限度的訊息堆積(外存的空間遠大於記憶體)。
但並不是每種訊息都需要持久化儲存。很多訊息對於投遞效能的要求大於可靠性的要求,且數量極大(如日誌)。這時候,訊息不落地直接暫存記憶體,嘗試幾次failover,最終投遞出去也未嘗不可。
市面上的訊息佇列普遍兩種形式都支援。當然具體的場景還要具體結合公司的業務來看。
儲存子系統的選擇
我們來看看如果需要資料落地的情況下各種儲存子系統的選擇。理論上,從速度來看,檔案系統>分散式KV(持久化)>分散式檔案系統>資料庫,而可靠性卻截然相反。還是要從支援的業務場景出發作出最合理的選擇,如果你們的訊息佇列是用來支援支付/交易等對可靠性要求非常高,但對效能和量的要求沒有這麼高,而且沒有時間精力專門做檔案儲存系統的研究,DB是最好的選擇。
但是DB受制於IOPS,如果要求單broker 5位數以上的QPS效能,基於檔案的儲存是比較好的解決方案。整體上可以採用資料檔案+索引檔案的方式處理,具體這塊的設計比較複雜,可以參考下篇的儲存子系統設計。
分散式KV(如MongoDB,HBase)等,或者持久化的Redis,由於其程式設計介面較友好,效能也比較可觀,如果在可靠性要求不是那麼高的場景,也不失為一個不錯的選擇。
消費關係解析
現在我們的訊息佇列初步具備了轉儲訊息的能力。下面一個重要的事情就是解析傳送接收關係,進行正確的訊息投遞了。
市面上的訊息佇列定義了一堆讓人暈頭轉向的名詞,如JMS 規範中的Topic/Queue,Kafka裡面的Topic/Partition/ConsumerGroup,RabbitMQ裡面的Exchange等等。拋開現象看本質,無外乎是單播與廣播的區別。所謂單播,就是點到點;而廣播,是一點對多點。當然,對於網際網路的大部分應用來說,組間廣播、組內單播是最常見的情形。
訊息需要通知到多個業務叢集,而一個業務叢集內有很多臺機器,只要一臺機器消費這個訊息就可以了。
當然這不是絕對的,很多時候組內的廣播也是有適用場景的,如本地快取的更新等等。另外,消費關係除了組內組間,可能會有多級樹狀關係。這種情況太過於複雜,一般不列入考慮範圍。所以,一般比較通用的設計是支援組間廣播,不同的組註冊不同的訂閱。組內的不同機器,如果註冊一個相同的ID,則單播;如果註冊不同的ID(如IP地址+埠),則廣播。
至於廣播關係的維護,一般由於訊息佇列本身都是叢集,所以都維護在公共儲存上,如config server、zookeeper等。維護廣播關係所要做的事情基本是一致的:
1、傳送關係的維護。2、傳送關係變更時的通知。
佇列高階特性設計
上面都是些訊息佇列基本功能的實現,下面來看一些關於訊息佇列特性相關的內容,不管可靠投遞/訊息丟失與重複以及事務乃至於效能,不是每個訊息佇列都會照顧到,所以要依照業務的需求,來仔細衡量各種特性實現的成本,利弊,最終做出最為合理的設計。
可靠投遞(最終一致性)
這是個激動人心的話題,完全不丟訊息,究竟可不可能?答案是,完全可能,前提是訊息可能會重複,並且,在異常情況下,要接受訊息的延遲。
方案說簡單也簡單,就是每當要發生不可靠的事情(RPC等)之前,先將訊息落地,然後傳送。當失敗或者不知道成功失敗(比如超時)時,訊息狀態是待發送,定時任務不停輪詢所有待發送訊息,最終一定可以送達。
具體來說:
1、producer往broker傳送訊息之前,需要做一次落地。2、請求到server後,server確保資料落地後再告訴客戶端傳送成功。3、支援廣播的訊息佇列需要對每個待發送的endpoint,持久化一個傳送狀態,直到所有endpoint狀態都OK才可刪除訊息。
對於各種不確定(超時、down機、訊息沒有送達、送達後資料沒落地、資料落地了回覆沒收到),其實對於傳送方來說,都是一件事情,就是訊息沒有送達。
重推訊息所面臨的問題就是訊息重複。重複和丟失就像兩個噩夢,你必須要面對一個。好在訊息重複還有處理的機會,訊息丟失再想找回就難了。
Anyway,作為一個成熟的訊息佇列,應該儘量在各個環節減少重複投遞的可能性,不能因為重複有解決方案就放縱的亂投遞。
最後說一句,不是所有的系統都要求最終一致性或者可靠投遞,比如一個論壇系統、一個招聘系統。一個重複的簡歷或話題被髮布,可能比丟失了一個釋出顯得更讓使用者無法接受。不斷重複一句話,任何基礎元件要服務於業務場景。
消費確認
當broker把訊息投遞給消費者後,消費者可以立即響應我收到了這個訊息。但收到了這個訊息只是第一步,我能不能處理這個訊息卻不一定。或許因為消費能力的問題,系統的負荷已經不能處理這個訊息;或者是剛才狀態機裡面提到的訊息不是我想要接收的訊息,主動要求重發。
把訊息的送達和訊息的處理分開,這樣才真正的實現了訊息佇列的本質-解耦。所以,允許消費者主動進行消費確認是必要的。當然,對於沒有特殊邏輯的訊息,預設Auto Ack也是可以的,但一定要允許消費方主動ack。
對於正確消費ack的,沒什麼特殊的。但是對於reject和error,需要特別說明。
reject這件事情,往往業務方是無法感知到的,系統的流量和健康狀況的評估,以及處理能力的評估是一件非常複雜的事情。舉個極端的例子,收到一個訊息開始build索引,可能這個訊息要處理半個小時,但訊息量卻是非常的小。所以reject這塊建議做成滑動視窗/執行緒池類似的模型來控制,消費能力不匹配的時候,直接拒絕,過一段時間重發,減少業務的負擔。
但業務出錯這件事情是隻有業務方自己知道的,就像上文提到的狀態機等等。這時應該允許業務方主動ack error,並可以與broker約定下次投遞的時間。
重複訊息和順序訊息
上文談到重複訊息是不可能100%避免的,除非可以允許丟失,那麼,順序訊息能否100%滿足呢? 答案是可以,但條件更為苛刻:1、允許訊息丟失。2、從傳送方到服務方到接受者都是單點單執行緒。
所以絕對的順序訊息基本上是不能實現的,當然在METAQ/Kafka等pull模型的訊息佇列中,單執行緒生產/消費,排除訊息丟失,也是一種順序訊息的解決方案。
一般來講,一個主流訊息佇列的設計正規化裡,應該是不丟訊息的前提下,儘量減少重複訊息,不保證訊息的投遞順序。
談到重複訊息,主要是兩個話題:1、如何鑑別訊息重複,並冪等的處理重複訊息。2、一個訊息佇列如何儘量減少重複訊息的投遞。
先來看看第一個話題,每一個訊息應該有它的唯一身份。不管是業務方自定義的,還是根據IP/PID/時間戳生成的MessageId,如果有地方記錄這個MessageId,訊息到來是能夠進行比對就能完成重複的鑑定。資料庫的唯一鍵/bloom filter/分散式KV中的key,都是不錯的選擇。由於訊息不能被永久儲存,所以理論上都存在訊息從持久化儲存移除的瞬間上游還在投遞的可能(上游因種種原因投遞失敗,不停重試,都到了下游清理訊息的時間)。這種事情都是異常情況下才會發生的,畢竟是小眾情況。兩分鐘訊息都還沒送達,多送一次又能怎樣呢?冪等的處理訊息是一門藝術,因為種種原因重複訊息或者錯亂的訊息還是來到了,說兩種通用的解決方案:
版本號
舉個簡單的例子,一個產品的狀態有上線/下線狀態。如果訊息1是下線,訊息2是上線。不巧訊息1判重失敗,被投遞了兩次,且第二次發生在2之後,如果不做重複性判斷,顯然最終狀態是錯誤的。
但是,如果每個訊息自帶一個版本號。上游傳送的時候,標記訊息1版本號是1,訊息2版本號是2。如果再發送下線訊息,則版本號標記為3。下游對於每次訊息的處理,同時維護一個版本號。
每次只接受比當前版本號大的訊息。初始版本為0,當訊息1到達時,將版本號更新為1。訊息2到來時,因為版本號>1.可以接收,同時更新版本號為2.當另一條下線訊息到來時,如果版本號是3.則是真實的下線訊息。如果是1,則是重複投遞的訊息。
如果業務方只關心訊息重複不重複,那麼問題就已經解決了。但很多時候另一個頭疼的問題來了,就是訊息順序如果和想象的順序不一致。比如應該的順序是12,到來的順序是21。則最後會發生狀態錯誤。
參考TCP/IP協議,如果想讓亂序的訊息最後能夠正確的被組織,那麼就應該只接收比當前版本號大一的訊息。並且在一個session週期內要一直儲存各個訊息的版本號。
如果到來的順序是21,則先把2存起來,待1到來後,先處理1,再處理2,這樣重複性和順序性要求就都達到了。
狀態機
基於版本號來處理重複和順序訊息聽起來是個不錯的主意,但凡事總有瑕疵。使用版本號的最大問題是:對傳送方必須要求訊息帶業務版本號;下游必須儲存訊息的版本號,對於要嚴格保證順序的。
還不能只儲存最新的版本號的訊息,要把亂序到來的訊息都儲存起來。而且必須要對此做出處理。試想一個永不過期的"session",比如一個物品的狀態,會不停流轉於上下線。那麼中間環節的所有儲存就必須保留,直到在某個版本號之前的版本一個不丟的到來,成本太高。
就剛才的場景看,如果訊息沒有版本號,該怎麼解決呢?業務方只需要自己維護一個狀態機,定義各種狀態的流轉關係。例如,"下線"狀態只允許接收"上線"訊息,“上線”狀態只能接收“下線訊息”,如果上線收到上線訊息,或者下線收到下線訊息,在訊息不丟失和上游業務正確的前提下。要麼是訊息發重了,要麼是順序到達反了。這時消費者只需要把“我不能處理這個訊息”告訴投遞者,要求投遞者過一段時間重發即可。而且重發一定要有次數限制,比如5次,避免死迴圈,就解決了。
舉例子說明,假設產品本身狀態是下線,1是上線訊息,2是下線訊息,3是上線訊息,正常情況下,訊息應該的到來順序是123,但實際情況下收到的訊息狀態變成了3123。
那麼下游收到3訊息的時候,判斷狀態機流轉是下線->上線,可以接收訊息。然後收到訊息1,發現是上線->上線,拒絕接收,要求重發。然後收到訊息2,狀態是上線->下線,於是接收這個訊息。
此時無論重發的訊息1或者3到來,還是可以接收。另外的重發,在一定次數拒絕後停止重發,業務正確。
中介軟體對於重複訊息的處理
迴歸到訊息佇列的話題來講。上述通用的版本號/狀態機/ID判重解決方案裡,哪些是訊息佇列該做的、哪些是訊息佇列不該做業務方處理的呢?其實這裡沒有一個完全嚴格的定義,但回到我們的出發點,我們保證不丟失訊息的情況下儘量少重複訊息,消費順序不保證。那麼重複訊息下和亂序訊息下業務的正確,應該是由消費方保證的,我們要做的是減少訊息傳送的重複。
我們無法定義業務方的業務版本號/狀態機,如果API裡強制需要指定版本號,則顯得過於綁架客戶了。況且,在消費方維護這麼多狀態,就涉及到一個消費方的訊息落地/多機間的同步消費狀態問題,複雜度指數級上升,而且只能解決部分問題。
減少重複訊息的關鍵步驟:
1、broker記錄MessageId,直到投遞成功後清除,重複的ID到來不做處理,這樣只要傳送者在清除週期內能夠感知到訊息投遞成功,就基本不會在server端產生重複訊息。
2、對於server投遞到consumer的訊息,由於不確定對端是在處理過程中還是訊息傳送丟失的情況下,有必要記錄下投遞的IP地址。決定重發之前詢問這個IP,訊息處理成功了嗎?如果詢問無果,再重發。
事務
永續性是事務的一個特性,然而只滿足永續性卻不一定能滿足事務的特性。還是拿扣錢/加錢的例子講。滿足事務的一致性特徵,則必須要麼都不進行,要麼都能成功。
解決方案從大方向上有兩種:1、兩階段提交,分散式事務。2、本地事務,本地落地,補償傳送。
分散式事務存在的最大問題是成本太高,兩階段提交協議,對於仲裁down機或者單點故障,幾乎是一個無解的黑洞。對於交易密集型或者I/O密集型的應用,沒有辦法承受這麼高的網路延遲,系統複雜性。
並且成熟的分散式事務一定構建與比較靠譜的商用DB和商用中介軟體上,成本也太高。
那如何使用本地事務解決分散式事務的問題呢?以本地和業務在一個數據庫例項中建表為例子,與扣錢的業務操作同一個事務裡,將訊息插入本地資料庫。如果訊息入庫失敗,則業務回滾;如果訊息入庫成功,事務提交。
然後傳送訊息(注意這裡可以實時傳送,不需要等定時任務檢出,以提高訊息實時性)。以後的問題就是前文的最終一致性問題所提到的了,只要訊息沒有傳送成功,就一直靠定時任務重試。
這裡有一個關鍵的點,本地事務做的,是業務落地和訊息落地的事務,而不是業務落地和RPC成功的事務。這裡很多人容易混淆,如果是後者,無疑是事務巢狀RPC,是大忌,會有長事務死鎖等各種風險。
本地事務存在兩個最大的使用障礙:1、配置較為複雜,“綁架”業務方,必須本地資料庫例項提供一個庫表。2、對於訊息延遲高敏感的業務不適用。
話說回來,不是每個業務都需要強事務的。扣錢和加錢需要事務保證,但下單和生成簡訊卻不需要事務,不能因為要求發簡訊的訊息儲存投遞失敗而要求下單業務回滾。所以,一個完整的訊息佇列應該定義清楚自己可以投遞的訊息型別,如事務型訊息,本地非持久型訊息,以及服務端不落地的非可靠訊息等。對不同的業務場景做不同的選擇。另外事務的使用應該儘量低成本、透明化,可以依託於現有的成熟框架,如Spring的宣告式事務做擴充套件。業務方只需要使用@Transactional標籤即可。
效能相關
非同步/同步
首先澄清一個概念,非同步,同步和oneway是三件事。非同步,歸根結底你還是需要關心結果的,但可能不是當時的時間點關心,可以用輪詢或者回調等方式處理結果;同步是需要當時關心的結果的;而oneway是發出去就不管死活的方式,這種對於某些完全對可靠性沒有要求的場景還是適用的,但不是我們重點討論的範疇。
迴歸來看,任何的RPC都是存在客戶端非同步與服務端非同步的,而且是可以任意組合的:客戶端同步對服務端非同步,客戶端非同步對服務端非同步,客戶端同步對服務端同步,客戶端非同步對服務端同步。
對於客戶端來說,同步與非同步主要是拿到一個Result,還是Future(Listenable)的區別。實現方式可以是執行緒池,NIO或者其他事件機制,這裡先不展開講。
服務端非同步可能稍微難理解一點,這個是需要RPC協議支援的。參考servlet 3.0規範,服務端可以吐一個future給客戶端,並且在future done的時候通知客戶端。
整個過程可以參考下面的程式碼:
客戶端同步服務端非同步。
客戶端同步服務端同步。
客戶端非同步服務端同步(這裡用執行緒池的方式)。
客戶端非同步服務端非同步。
上面說了這麼多,其實是想讓大家脫離兩個誤區:1、RPC只有客戶端能做非同步,服務端不能。2、非同步只能透過執行緒池。
那麼,服務端使用非同步最大的好處是什麼呢?說到底,是解放了執行緒和I/O。試想服務端有一堆I/O等待處理,如果每個請求都需要同步響應,每條訊息都需要結果立刻返回,那麼就幾乎沒法做I/O合併(當然介面可以設計成batch的,但可能batch發過來的仍然數量較少)。而如果用非同步的方式返回給客戶端future,就可以有機會進行I/O的合併,把幾個批次發過來的訊息一起落地(這種合併對於MySQL等允許batch insert的資料庫效果尤其明顯),並且徹底釋放了執行緒。不至於說來多少請求開多少執行緒,能夠支援的併發量直線提高。
來看第二個誤區,返回future的方式不一定只有執行緒池。換句話說,可以線上程池裡面進行同步操作,也可以進行非同步操作,也可以不使用執行緒池使用非同步操作(NIO、事件)。
回到訊息佇列的議題上,我們當然不希望訊息的傳送阻塞主流程(前面提到了,server端如果使用非同步模型,則可能因訊息合併帶來一定程度上的訊息延遲),所以可以先使用執行緒池提交一個傳送請求,主流程繼續往下走。
但是執行緒池中的請求關心結果嗎?Of course,必須等待服務端訊息成功落地,才算是訊息傳送成功。所以這裡的模型,準確地說事客戶端半同步半非同步(使用執行緒池不阻塞主流程,但執行緒池中的任務需要等待server端的返回),server端是純非同步。客戶端的執行緒池wait在server端吐回的future上,直到server端處理完畢,才解除阻塞繼續進行。
總結一句,同步能夠保證結果,非同步能夠保證效率,要合理的結合才能做到最好的效率。
批次
談到批次就不得不提生產者消費者模型。但生產者消費者模型中最大的痛點是:消費者到底應該何時進行消費。大處著眼來看,消費動作都是事件驅動的。主要事件包括:
1、攢夠了一定數量。
2、到達了一定時間。
3、佇列裡有新的資料到來。
對於及時性要求高的資料,可用採用方式3來完成,比如客戶端向服務端投遞資料。只要佇列有資料,就把佇列中的所有資料刷出,否則將自己掛起,等待新資料的到來。
在第一次把佇列資料往外刷的過程中,又積攢了一部分資料,第二次又可以形成一個批次。虛擬碼如下:
這種方式是訊息延遲和批次的一個比較好的平衡,但優先響應低延遲。延遲的最高程度由上一次傳送的等待時間決定。但可能造成的問題是傳送過快的話批次的大小不夠滿足效能的極致。
相反對於可以用適量的延遲來換取高效能的場景來說,用定時/定量二選一的方式可能會更為理想,既到達一定數量才傳送,但如果數量一直達不到,也不能幹等,有一個時間上限。
具體說來,在上文的submit之前,多判斷一個時間和數量,並且Runnable內部維護一個定時器,避免沒有新任務到來時舊的任務永遠沒有機會觸發傳送條件。對於server端的資料落地,使用這種方式就非常方便。
為什麼網路請求小包合併成大包會提高效能?主要原因有兩個:
1、減少無謂的請求頭,如果你每個請求只有幾字節,而頭卻有幾十位元組,無疑效率非常低下。
2、減少回覆的ack包個數。把請求合併後,ack包數量必然減少,確認和重發的成本就會降低。
上文提到的訊息佇列,大多是針對push模型的設計。現在市面上有很多經典的也比較成熟的pull模型的訊息佇列,如Kafka、MetaQ等。這跟JMS中傳統的push方式有很大的區別,可謂另闢蹊徑。
-
3 # 淺析架構
1.先上圖,明確一個訊息傳送和消費的流程
2.訊息訊息首先需要定義訊息協議,比如ActiveMQ,Stomp,XMPP等
3.訊息協議定義好了,明確訊息佇列需要滿足什麼場景
是否需要保證訊息可靠性,如果需要就要做儲存的高可用;
是否需要支援同步和非同步訊息;
是否需要保證訊息的順序;
是否需要支援延時訊息
具體可以參照很多開源實現的特性,比如RocketMQ就是高可靠的,Kafka就是高吞吐量的,但是不是高可靠的。
回覆列表
這取決於你分散式系統的規劃。裝置有sn,任務有ID,微服務的設定topic字首等等。分散式規劃合理,使用佇列就水到渠成,不需要特別設計。如果分散式規模很大,抱歉我沒經驗