首頁>技術>

背景

之前琢磨了很久一直想寫一篇pulsar相關的文章,但是一直知識儲備不夠,對於很多細節還是不瞭解,於是查了很多資料,總算是可以湊出一篇文章了。

在開源的業界已經有這麼多訊息佇列中介軟體了,pulsar作為一個新勢力到底有什麼優點呢?pulsar自從出身就不斷地再和其他的訊息佇列(kafka,rocketmq等等)做比較,但是Pulsar的設計思想和大多數的訊息佇列中介軟體都不同,具備了高吞吐,低延遲,計算儲存分離,多租戶,異地複製等功能,所以pulsar也被譽為下一代訊息佇列中介軟體,接下來我會一一對其進行詳細的解析。

pulsar架構原理

整體的架構和其他的訊息佇列中介軟體差別不是太大,相信大家也看到了很多熟悉的名詞,接下來會給大家一一解釋這些名詞的含義。

名詞解釋Producer:訊息生產者,將訊息傳送到broker。Consumer:訊息消費者,從Broker讀取訊息到客戶端,進行消費處理。Broker: 可以看作是pulsar的server,Producer和Consumer都看作是client.訊息處理的節點,pulsar的Broker和其他訊息中介軟體的都不一樣,他是無狀態的沒有儲存,所以可以無限制的擴充套件,這個後面也會詳解講到。Bookie: 負責所有訊息的持久化,這裡採用的是Apache Bookeeper。ZK: 和kafka一樣pulsar也是使用zk儲存一些元資料,比如配置管理,topic分配,租戶等等。Service Discovery:可以理解為Pulsar中的nginx,只用一個url就可以和整個broker進行打交道,當然也可以使用自己的服務發現。客戶端發出的讀取,更新或刪除主題的初始請求將傳送給可能不是處理該主題的 broker 。 如果這個 broker 不能處理該主題的請求,broker 將會把該請求重定向到可以處理主題請求的 broker。

不論是kafka,rocketmq還是我們的pulsar其實作為訊息佇列中介軟體最為重要的大概就是分為三個部分:

Producer是如何生產訊息,傳送到對應的BrokerBroker是如何處理訊息,將高效的持久化以及查詢Consumer是如何進行消費訊息

而我們後面也會圍繞著這三個部分進行展開講解。

Producer生產訊息

先簡單看一下如何用程式碼進行訊息傳送:

PulsarClient client = PulsarClient.create("pulsar://pulsar.us-west.example.com:6650");Producer producer = client.createProducer(                "persistent://sample/standalone/ns1/my-topic"); // Publish 10 messages to the topicfor (int i = 0; i < 10; i++) {    producer.send("my-message".getBytes());}複製程式碼
Step1: 首先使用我們的url建立一個client這個url是我們service discovery的地址,如果我們使用單機模式可以進行直連Step2:我們傳入了一個類似url的引數,我們只需要傳遞這個就能指定我們到底在哪個topic或者namespace下面建立的:

url的格式為:{persistent|non-persistent}://tenant/namespace/topic

Step3: 呼叫send方法傳送訊息,這裡也提供了sendAsync方法支援非同步傳送。

上面三個步驟中,步驟1,2屬於我們準備階段,用於構建客戶端,構建Producer,我們真的核心邏輯在send中,那這裡我先提幾個小問題,大家可以先想想在其他訊息佇列中是怎麼做的,然後再對比pulsar的看一下:

我們呼叫了send之後是會立即傳送嗎?如果是多partition,怎麼找到我應該傳送到哪個Broker呢?傳送模式

我們上面說了send分為async和sync兩種模式,但實際上在pulsar內部sync模式也是採用的async模式,在sync模式下模擬回撥阻塞,達到同步的效果,這個在kafka中也是採用的這個模式,但是在rocketmq中,所有的send都是真正的同步,都會直接請求到broker。

基於這個模式,在pulsar和kafka中都支援批次傳送,在rocketmq中是直接傳送,批次傳送有什麼好處呢?當我們傳送的TPS特別高的時候,如果每次傳送都直接和broker直連,可能會做很多的重複工作,比如壓縮,鑑權,建立連結等等。比如我們傳送1000條訊息,那麼可能會做1000次這個重複的工作,如果是批次傳送的話這1000條訊息合併成一次請求,相對來說壓縮,鑑權這些工作就只需要做一次。

有同學可能會問,批次傳送會不會導致傳送的時間會有一定的延誤?這個其實不需要擔心,在pulsar中預設定時每隔1ms傳送一次batch,或者當batchsize預設到了1000都會進行傳送,這個傳送的頻率都還是很快的。

傳送負載均衡

在訊息佇列中通常會將topic進行水平擴充套件,在pulsar和kafka中叫做partition,在rocketmq中叫做queue,本質上都是分割槽,我們可以將不同分割槽落在不同的broker上,達到我們水平擴充套件的效果。

在我們傳送的時候可以自己制定選擇partition的策略,也可以使用它預設輪訓partition策略。當我們選擇了partition之後,我們怎麼確定哪一個partition對應哪一個broker呢?

可以先看看下面這個圖:

Step1: 我們所有的資訊分割槽對映資訊在zk和broker的快取中都有進行儲存。Step2: 我們透過查詢broker,可以獲取到分割槽和broker的關係,並且定時更新。Step3: 在pulsar中每個分割槽在傳送端的時候都被抽象成為一個單獨的Producer,這個和kafka,rocketmq都不一樣,在kafka裡面大概就是選擇了partition之後然後再去找partition對應的broker地址,然後進行傳送。pulsar將每一個partition都封裝成Producer,再程式碼實現上就不需要去關注他具體對應的是哪個broker,所有的邏輯都在producer這個程式碼裡面,整體來說比較乾淨。壓縮訊息

訊息壓縮是最佳化資訊傳輸的手段之一,我們通常看見一些大型檔案都會是以一個壓縮包的形式提供下載,在我們訊息佇列中我們也可以用這種思想,我們將一個batch的訊息,比如有1000條可能有1M的傳輸大小,但是經過壓縮之後可能就只會有幾十kb,增加了我們和broker的傳輸效率,但是與之同時我們的cpu也帶來了損耗。Pulsar客戶端支援多種壓縮型別,如 lz4、zlib、zstd、snappy 等。

client.newProducer()    .topic(“test-topic”)    .compressionType(CompressionType.LZ4)    .create();複製程式碼
Broker

接下來我們來說說第二個比較重要的部分Broker,在Broker的設計中pulsar和其他所有的訊息佇列差別比較大,而正是因為這個差別也成為了他的特點。

計算和儲存分離

首先我們來說說他最大的特點:計算和儲存分離。我們在開始的說過Pulsar是下一代訊息佇列,就非常得益於他這個架構設計,無論是kafka還是RocketMQ,所有的計算和儲存都放在同一個機器上,這個模式有幾個弊端:

擴充套件困難:當我們需要擴充套件的叢集的時候,我們通常是因為cpu或者磁碟其中一個原因影響,但是我們卻要申請一個可能cpu和磁碟配置都很好的機器,造成了資源浪費。並且kafka這種進行擴充套件,還需要進行遷移資料,過程十分繁雜。負載不均衡:當某些partion資料特別多的時候,會導致broker負載不均衡,如下面圖,如果某個partition資料特別多,那麼就會導致某個broker(輪船)承載過多的資料,但是另外的broker可能又比較空閒

pulsar計算分離架構能夠非常好地解決這個問題:

對於計算:也就是我們的broker,提供訊息佇列的讀寫,不儲存任何資料,無狀態對於我們擴充套件非常友好,只要你機器足夠,就能隨便上。擴容Broker往往適用於增加Consumer的吞吐,當我們有一些大流量的業務或者活動,比如電商大促,可以提前進行broker的擴容。對於儲存:也就是我們的bookie,只提供訊息佇列的儲存,如果對訊息量有要求的,我們可以擴容bookie,並且我們不需要遷移資料,擴容十分方便。訊息儲存名詞解析:

上圖是bookie的讀寫架構圖,裡面有一些名詞需要先介紹一下:

Entry,Entry是儲存到bookkeeper中的一條記錄,其中包含Entry ID,記錄實體等。Ledger,可以認為ledger是用來儲存Entry的,多個Entry序列組成一個ledger。Journal,其實就是bookkeeper的WAL(write ahead log),用於存bookkeeper的事務日誌,journal檔案有一個最大大小,達到這個大小後會新起一個journal檔案。Entry log,儲存Entry的檔案,ledger是一個邏輯上的概念,entry會先按ledger聚合,然後寫入entry log檔案中。同樣,entry log會有一個最大值,達到最大值後會新起一個新的entry log檔案Index file,ledger的索引檔案,ledger中的entry被寫入到了entry log檔案中,索引檔案用於entry log檔案中每一個ledger做索引,記錄每個ledger在entry log中的儲存位置以及資料在entry log檔案中的長度。MetaData Storage,元資料儲存,是用於儲存bookie相關的元資料,比如bookie上有哪些ledger,bookkeeper目前使用的是zk儲存,所以在部署bookkeeper前,要先有zk叢集。

整體架構上的寫流程:

Step1: broker發起寫請求,首先對Journal磁碟寫入WAL,熟悉mysql的朋友知道redolog,journal和redolog作用一樣都是用於恢復沒有持久化的資料。Step2: 然後再將資料寫入index和ledger,這裡為了保持效能不會直接寫盤,而是寫pagecache,然後非同步刷盤。Step3: 對寫入進行ack。

讀流程為:

Step1: 先讀取index,當然也是先讀取cache,再走disk。Step2: 獲取到index之後,根據index去entry logger中去對應的資料如何高效讀寫?

在kafka中當我們的topic變多了之後,由於kafka一個topic一個檔案,就會導致我們的磁碟IO從順序寫變成隨機寫。在rocketMq中雖然將多個topic對應一個寫入檔案,讓寫入變成了順序寫,但是我們的讀取很容易導致我們的pagecache被各種覆蓋重新整理,這對於我們的IO的影響是非常大的。所以pulsar在讀寫兩個方面針對這些問題都做了很多最佳化:

寫流程:順序寫 + pagecache。在寫流程中我們的所有的檔案都是獨立磁碟,並且同步刷盤的只有Journal,Journal是順序寫一個journal-wal檔案,順序寫效率非常高。ledger和index雖然都會存在多個檔案,但是我們只會寫入pagecache,非同步刷盤,所以隨機寫不會影響我們的效能。讀流程:broker cache + bookie cache,在pulsar中對於追尾讀(tailing read)非常友好基本不會走io,一般情況下我們的consumer是會立即去拿producer傳送的訊息的,所以這部分在持久化之後依然在broker中作為cache存在,當然就算broker沒有cache(比如broker是新建的),我們的bookie也會在memtable中有自己的cache,透過多重cache減少讀流程走io。

我們可以發現在最理想的情況下讀寫的io是完全隔離開來的,所以在Pulsar中能很容易就支援百萬級topic,而在我們的kafka和rocketmq中這個是非常困難的。

無限流式儲存

一個Topic實際上是一個ledgers流(Segment),透過這個設計所以Pulsar他並不是一個單純的訊息佇列系統,他也可以代替流式系統,所以他也叫流原生平臺,可以替代flink等系統。

可以看見我們的Event Stream(topic/partition),由多個Segment儲存組成,而每個segment由entry組成,這個可以看作是我們每批發送的訊息通常會看作是一個entry。

Segment可以看作是我們寫入檔案的一個基本維度,同一個Segment的資料會寫在同一個檔案上面,不同Segment將會是不同檔案,而Segment之間的在metadata中進行儲存。

分層儲存

在kafka和rocketmq中訊息是會有一定的儲存時間的,因為磁碟會有空間限制,在pulsar中也提供這個功能,但是如果你想讓自己的訊息永久儲存,那麼可以使用分級儲存,我們可以將一些比較老的資料,定時地重新整理到廉價的儲存中,比如s3,那麼我們就可以無限儲存我們的訊息隊列了。

資料複製

在pulsar中的資料複製和kafka,rocketmq都有很大的不同,在其他訊息佇列中通常是其他副本主動同步,通常這個時間就會變得不可預測,而在pulsar採用了類似qurom協議,給一組可用的bookie池,然後併發的寫入其中的一部分bookie,只要返回部分成功(通常大於1/2)就好。

Ensemble Size(E)決定給定 ledger 可用的 bookie 池大小。Write Quorum Size(Qw)指定 Pulsar 向其中寫入 entry 的 bookie 數量。Ack Quorum Size(Qa)指定必須 ack 寫入的 bookie 數量。

採用這種併發寫的方式,會更加高效的進行資料複製,尤其是當資料副本比較多的時候。

Consumer

接下來我們來聊聊pulsar中最後一個比較重要的組成consumer。

訂閱模式

訂閱模式是用來定義我們的訊息如何分配給不同的消費者,不同訊息佇列中介軟體都有自己的訂閱模式,一般我們常見的訂閱模式有:

叢集模式:一條訊息只能被一個叢集內的消費者所消費。廣播模式:一條訊息能被叢集內所有的消費者消費。

在pulsar中提供了4種訂閱模式,分別是獨佔,災備,共享,鍵共享:

獨佔:顧名思義只能由一個消費者獨佔,如果同一個叢集內有第二個消費者去註冊,第二個就會失敗,這個適用於全域性有序的訊息。災備:加強版獨佔,如果獨佔的那個掛了,會自動地切換到另外一個好的消費者,但是還是隻能由一個獨佔。共享模式:這個模式看起來有點像叢集模式,一條訊息也是隻能被一個叢集內消費者消費,但是和rocketmq不同的是,rocketmq是以partition維度,同一個Partition的資料都會被髮到一個機器上。在Pulsar中消費不會以partition維度,而是輪訓所有消費者進行訊息傳送。這有個什麼好處呢?如果你有100臺機器,但是你只有10個partition其實你只有10臺消費者能運轉,但是在pulsar中100臺機器都可以進行消費處理。鍵共享:類似上面說的partition維度去傳送,在rocketmq中同一個key的順序訊息都會被髮送到一個partition,但是這裡不會有partition維度,而只是按照key的hash去分配到固定的consumer,也解決了消費者能力限制於partition個數問題。訊息獲取模式

不論是在kafka還是在rocketmq中我們都是client定時輪訓我們的broker獲取訊息,這種模式叫做長輪訓(Long-Polling)模式。這種模式有一個缺點網路開銷比較大,我們來計算一下consumer被消費的時延,我們假設broker和consumer之間的一次網路延時為R,那麼我們總共的時間為:

當某一條訊息A剛到broker的,這個時候long-polling剛好打包完資料返回,broker返回到consumer這個時間為R。consumer又再次傳送request請求,這個又為R。將我們的訊息A返回給consumer這裡又為R。

如果只考慮網路時延,我們可以看見我們這條訊息的消費時延大概是3R,所以我們必須想點什麼對其進行一些最佳化,有同學可能馬上就能想到,我們訊息來了直接推送給我們的consumer不就對了,這下我們的時延只會有一次R,這個就是我們常見的推模式,但是簡單的推模式是有問題的,如果我們有生產速度遠遠大於消費速度,那麼推送的訊息肯定會幹爆我們的記憶體,這個就是背壓。那麼我們怎麼解決背壓呢?我們就可以最佳化推送方式,將其變為動態推送,我們結合Long-polling,在long-polling請求時將Buffer剩餘空間告知給Broker,由Broker負責推送資料。此時Broker知道最多可以推送多少條資料,那麼就可以控制推送行為,不至於沖垮Consumer。

舉個例子:

Consumer發起請求時Buffer剩餘容量為100,Broker每次最多返回32條訊息,那麼Consumer的這次long-polling請求Broker將在執行3次push(共push96條訊息)之後返回response給Consumer(response包含4條訊息)。

如果採用long-polling模型,Consumer每傳送一次請求Broker執行一次響應,這個例子需要進行4次long-polling互動(共4個request和4個response,8次網路操作;Dynamic Push/Pull中是1個request,三次push和一個response,共5次網路操作)。

所以pulsar就採用了這種訊息獲取模式,從consumer層進一步最佳化訊息達到時間。我覺得這個設計非常巧妙,很多中介軟體的這種long-polling模式都可以參考這種思想去做一個改善。

總結

Apache Pulsar很多設計思想都和其他中介軟體不一樣,但無疑於其更加貼近於未來,大膽預測一下其他的一些訊息中介軟體未來的發展也都會向其靠攏,目前國內的Pulsar使用者也是越來越多,騰訊雲提供了pulsar的雲版本TDMQ,當然還有一些其他的知名公司華為,知乎,虎牙等等有都在對其做一個逐步的嘗試,我相信pulsar真的是一個趨勢。最後也讓我想起了最近大江大河大結局的一句話:

所有的變化,都可能伴隨著痛苦和彎路,開放的道路,也不會是闊野坦途,但大江大河,奔湧向前的趨勢,不是任何險灘暗礁,能夠阻擋的。道之所在,雖千萬人吾往矣。

原文連結:https://juejin.cn/post/6918967598432387085

22
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 「python opencv視覺入門」十五用opencv畫畫