本文作者為 StreamNative 軟體工程師李鵬輝與 Overstock 資料工程師 Devin G. Bost。中文部落格內容由 StreamNative 翻譯並整理。
出處:https://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng==&mid=2247486485&idx=1&sn=4b452c9e1f2adfeb08292e7d480ff733
關於 Apache PulsarApache Pulsar 是 Apache 軟體基金會頂級專案,是下一代雲原生分散式訊息流平臺,集訊息、儲存、輕量化函式式計算為一體,採用計算與儲存分離架構設計,支援多租戶、持久化儲存、多機房跨區域資料複製,具有強一致性、高吞吐、低延時及高可擴充套件性等流資料儲存特性。
GitHub 地址:http://github.com/apache/pulsar/
Apache Pulsar 的效能通常指與讀寫訊息相關的吞吐量和延遲。在使用 Pulsar 時,使用者可以透過配置一些引數來控制系統讀寫訊息的方式。我將會出一個系列,來介紹 Apache Pulsar 在讀寫訊息方面的效能調優,這是本系列的第一篇。
本文深入介紹一些關於 Apache Pulsar 的基礎概念,包括 Pulsar 的架構、儲存層、Apache BookKeeper 等。瞭解 BookKeeper 有助於更深入地瞭解 Pulsar 的調優過程。
首先,我們來了解一下關於 Pulsar 的基礎概念。我們至少需要知道 Apache Pulsar 如何收發訊息(即生產和消費訊息)。
1. 基礎概念本文中的基礎概念和術語有助於更好地瞭解 Apache Pulsar 的工作方式。
1.1 訊息在 Pulsar 中,資料的基本單元即為訊息。Producer 傳送訊息到 broker,然後 broker 透過流控制傳送訊息到 consumer。想要深入瞭解 Pulsar 的訊息流控制,參閱 Pulsar 訊息流控制官方文件 [1] 。
訊息中不僅包含 producer 寫入 topic 的資料,還包含一些重要的元資料。
在 Pulsar 中,訊息有兩種型別:批訊息與單條訊息。單條訊息的序列即為批訊息。(關於批處理訊息的詳細內容,我會在 1.5.1 章節介紹。)
1.2 TopicTopic 是一個訊息目錄或者說存放訊息的名稱空間,也就是訊息釋出(生產)的位置。一個 topic 可以有一個或多個 producer 和/或 consumer。Producer 向 topic 寫入訊息,consumer 從 topic 消費訊息。圖 1 展示了三者之間如何協同工作。
圖 1. Producer 和 Consumer 的工作機制
1.3 BookieApache Pulsar 使用 Apache BookKeeper 作為儲存層。Apache BookKeeper 針對實時工作負載進行最佳化,是一項可擴充套件、可容錯、低延遲的儲存服務。客戶端釋出的訊息儲存在 BookKeeper 的伺服器例項中,即 bookie。
1.3.1 Entry 和 ledgerEntry 和 ledger 是 BookKeeper 中的基本術語。Entry 中包含寫入 ledger 的資料,也包含一些重要的元資料。Ledger 是 BookKeeper 中的基本儲存單元。一系列的 entry 組成一個 ledger,entry 被順序寫入 ledger。
1.3.2 JournalJournal 檔案包含 BookKeeper 中的訊息寫入日誌。在更新 ledger 前,bookie 確保已經將更新的交易(交易日誌 entry)寫入非易失儲存。在 bookie 第一次執行或舊的 journal 檔案大小達到指定閾值時,會建立新的 journal 檔案。
1.3.3 Entry LogEntry log 檔案用於管理 BookKeeper 客戶端寫入的 entry。來自不同 ledger 的 entry 會被依次寫入一個或多個 entry log 中,而偏移量則作為指標儲存在 ledger 快取中,以進行快速查詢。
1.3.4 索引資料庫Bookie 使用 RocksDB 作為 entry 索引資料庫。RocksDB 基於 log-structured merger(LSM)樹,具有高效能、可嵌入、永續性、鍵值儲存等特性。瞭解 LSM 樹的機制能夠幫助我們更好地瞭解 BookKeeper 的機制。更多關於 LSM 樹設計原理等的資訊,參閱 LSM 樹設計 pdf 文件 [2] 。
當 BookKeeper 客戶端向 ledger 寫入 entry 時,bookie 就會將此 entry 寫入 journal,並在寫入完成後傳送響應給客戶端。後臺執行緒將 entry 寫入到 entry log。當 bookie 的後臺執行緒將資料 flush 到 entry log 時,會同時更新索引。此過程如圖 2 所示。
圖 2. 當 BookKeeper 客戶端向 Ledger 寫入 Entry 時的工作機制
當從 ledger 讀取 entry 時,bookie 首先從索引資料庫中找到 entry 的位置,然後從 entry log 中讀取資料。
更多關於 BookKeeper 架構的資訊,參閱 BookKeeper 概念 [3] 。
1.4 Broker在 Pulsar 中,broker 是一個無狀態伺服器,用於協助讀寫資料。一個 topic 不能同時被多個 broker 管理,但是 topic 可以儲存在多個 bookie 服務中。
1.4.1 Managed LedgerBroker 使用一個 managed ledger 作為 topic 的後端儲存平臺。如圖 3 所示,managed ledger 可以擁有多個 ledger 和多個遊標。Managed ledger 中的 ledger 是 topic 中 entry 的序列,並且遊標可以表示對同一個 topic 的多個訂閱。
圖 3. 與 Topic 關聯的 Managed Ledger 中的 Ledger 和遊標
一個 managed ledger 中會包含多個 ledger。Managed ledger 如何決定是否啟動新的 ledger?Ledger 太大,會增加資料恢復時間;ledger 太小,則必須頻繁切換 ledger ,並且 managed ledger 會更頻繁地呼叫 Meta Store 來更新 managed ledger 中的元資料。Managed ledger 的 ledger 滾動策略決定了建立新 ledger 的頻率。以下 Pulsar 引數可用於控制 broker.conf 中 ledger 的行為:
1.4.2 Managed ledger 快取Managed ledger 快取是一種快取儲存器,用於儲存跨主題的尾部訊息。在尾部訊息讀取時,consumer 從服務 broker 中讀取資料。由於 broker 已經將資料快取在記憶體中,因此無需從磁碟讀取資料,也不需要和訊息寫入爭奪資源。
1.5 客戶端使用者利用 Pulsar 客戶端建立 producer(釋出訊息到 topic)和 consumer(從 topic 消費訊息)。Pulsar 支援多個客戶端。更多資訊,檢視 Pulsar client library [4] 。
1.5.1 批訊息批處理訊息指一組單條訊息,並且這一組訊息代表單個連續序列。使用批處理訊息可以減少客戶端和伺服器端的開銷。把訊息分成小批,便可以在每個任務等待時間不增加很多的情況下,實現批處理的一些效能優勢。
在 Pulsar 中使用批處理時,producer 向 broker 傳送批訊息。當批訊息到達 broker 後,broker 與 bookie 相連線,然後 bookie 將批訊息儲存在 BookKeeper 中。當 consumer 從 broker 中讀取訊息時,broker 會將批訊息分派給 consumer。因此,組合與拆分批訊息都在客戶端中進行。下面的程式碼展示瞭如何為 producer 啟用和配置訊息批處理:
client.newProducer() .topic(“topic-name”) .enableBatching(true) .batchingMaxPublishDelay(2, TimeUnit.MILLISECONDS) .batchingMaxMessages(100) .batchingMaxBytes(1024 * 1024) .create();
在上述示例中,當批訊息數量超過 100 條或批訊息資料量達到 1M 時,producer 會結束掉當前的批訊息並立即傳送至 broker。如果在兩毫秒內,上述引數值不符合條件,則 producer 也會結束掉當前的批訊息併發送至 broker。
因此,引數設定將取決於訊息的吞吐量和釋出訊息時可接受的釋出延遲。
1.5.2 訊息壓縮訊息壓縮可以透過消耗客戶端 CPU 來減小訊息大小。Pulsar 客戶端支援多種壓縮型別,如 lz4、zlib、zstd、snappy 等。壓縮型別儲存在訊息元資料中,因此 consumer 可以根據需要自動適應不同的壓縮型別。
啟用訊息批處理時,Pulsar 客戶端會減小批處理大小來改進壓縮。下面的程式碼展示瞭如何為 producer 啟用壓縮型別:
client.newProducer() .topic(“topic-name”) .compressionType(CompressionType.LZ4) .create();
1.5.3 設定 Producer 待處理訊息數的最大值
Producer 使用佇列來儲存等待 broker 回執的訊息。因此,增加此佇列大小可以增加發送訊息的吞吐量。但是,這樣也會使用更多記憶體。
下面的程式碼展示瞭如何為 producer 配置待處理訊息佇列的大小:
client.newProducer() .topic(“topic-name”) .maxPendingMessages(2000) .create();
在設定 maxPendingMessages 的值時,需要考慮記憶體對客戶端應用程式的影響。用每條訊息的位元組數乘以 maxPengingMessages 值就可以預估對記憶體產生的影響。例如,假設每條訊息的大小為 100 KB,則在 maxPengingMessages 設定為 2000 時,會額外增加 200 MB(2000 * 100 KB = 200,000 KB = 200 MB)的記憶體。
1.5.4 配置 Consumer 接收佇列的大小Consumer 接收佇列決定了在使用者的應用程式刪除訊息之前,consumer 可以累積訊息的數量。增加 consumer 接收佇列的大小可能會提高消費吞吐量,但同時也會響應增加記憶體的使用。
下面的程式碼展示瞭如何為 consumer 配置接收佇列的大小:
client.newConsumer() .topic(“topic-name”) .subscriptionName(“sub-name”) .receiverQueueSize(2000) .subscribe();
2. 伺服器端寫入訊息想要有效地調整訊息編寫效能,理解訊息寫入的方式很重要。
2.1 Broker 和 bookie 之間的互動當客戶端釋出訊息到 topic 時,會將訊息傳送到服務於此 topic 的 broker,同時,此 broker 向儲存層並行寫入資料。
如圖 4 所示,當資料副本數量增加時,broker 需要的網路頻寬開銷也會增加。要想減小對網路頻寬的影響,可以在不同級別配置永續性引數:
• Pulsar 級別 • Broker 級別 • 名稱空間級別
圖 4. Topic 內 Broker 與 Bookie 之間的互動
2.1.1 配置 Pulsar 的永續性引數Pulsar 透過三個引數來配置訊息副本數和一致性:
• Ensemble Size(E)決定給定 ledger 可用的 bookie 池大小。 • Write Quorum Size(Qw)指定 Pulsar 向其中寫入 entry 的 bookie 數量。 • Ack Quorum Size(Qa)指定必須 ack 寫入的 bookie 數量。
增加 E 可以最佳化吞吐量。增加 Qw 可以增加或減少資料的副本數,但是會影響寫入吞吐量。增加 Qa 可以增加已 ack 寫入的永續性,但是可能會增加延遲或延長尾部延遲。
2.1.2 在 broker 級別配置永續性引數可以在 broker 級別配置預設永續性引數。 broker.conf 中的以下引數決定了預設永續性策略:
2.1.3 在名稱空間級別配置永續性引數除了上述兩種引數配置外,還可以在名稱空間級別的策略中配置永續性引數。下面的程式碼中,三個永續性引數值都設定為“3”。
$ bin/pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 3 --bookkeeper-ensemble 3--bookkeeper-write-quorum 3 my-tenant/my-namespace
2.1.4 配置 worker 執行緒池的大小
為保證 topic 內的訊息按照寫入順序進行儲存,broker 採用單執行緒寫入與單個 topic 相關的 managed ledger entry。Broker 從名稱相同的 managed ledger worker 執行緒池中選擇一個執行緒。可以在 broker.conf 中使用以下引數配置 worker 執行緒池的大小。
2.2 Bookie 如何處理 entry 請求本節詳細、分步講解 bookie 如何處理新新增的 bookie 請求。處理過程圖解如圖 5 所示。
圖 5. Bookie 如何處理新增 Entry 的請求
當 bookie 收到新增 entry 的請求時:
1. 請求處理器將 entry 追加到 journal 日誌中,以確保資料的永續性。
2. 請求處理器向 ledger 儲存中的記憶體 table 寫入資料。
3. 請求處理器完成客戶端請求。如果寫入 entry 儲存成功,則客戶端會收到 entry ID;否則,客戶端會收到異常。
4. Ledger 儲存定期向 entry log flush 記憶體 table,這一過程稱為 checkpoint。當資料 flush 到 entry log 中時,bookie 為每一條 entry 建立索引,以便有效地讀取資料。
為了更好地進行 I/O 讀寫隔離,bookie 可以將 journal 目錄和 entry 目錄分別儲存在不同的磁碟上。想同時擁有讀寫高吞吐量,則應該將 journal 和 ledger 儲存在不同磁碟上。Journal 和 ledger 都可以利用多個磁碟的 I/O 並行性。
Bookie 使用單執行緒來處理每個 journal 目錄中 journal 資料的寫入。基於使用經驗,我們知道 journal 寫入執行緒在某些情況下會引起阻塞。你可以指定多個 journal 目錄,但也不能太多,因為分配過多目錄會導致隨機寫入磁碟的次數增加。
在 bookkeeper.conf 中 journal 目錄和 ledger 目錄的引數配置如下:
當請求處理器追加新 entry 到 journal 日誌時(一種預寫式日誌,WAL),bookie 要求處理器從與 ledger ID 相關的寫入執行緒池中提供一個執行緒。你可以配置執行緒池的大小,也可以配置單個執行緒中用於處理 entry 寫入請求的待處理請求最大值。
如果新增 entry 的待處理請求數超過了 bookkeeper.conf 中新增 entry 待處理請求數的設定值,則 bookie 將拒絕新增 entry 的新請求。
預設情況下,sync 所有 journal 日誌 entry 到磁碟,以避免在斷電時丟失資料。因此,資料同步的延遲對寫入吞吐量和延遲影響最大。如果將 HDD 用作 journal 磁碟,需確保禁用 journal sync 機制,以便在 entry 成功寫入 OS page cache 後,bookie 客戶端可以得到響應。在 bookkeeper.conf 中啟用或禁用 journal 資料 sync 的引數如下:
批次提交機制允許將等待執行的任務分組為小批。這種處理方式可以提高批處理的效能,同時不會使單個任務的延遲增加過多。Bookie 也可以採用同一方法來提高 journal 資料寫入的吞吐量。對 journal 資料啟用組提交機制可以減少磁碟操作,同時還可以避免過多的小檔案寫入。但是,禁用組提交可以避免增加延遲。
在 bookkeeper.conf 中配置以下引數啟用或禁用批次提交機制:
將 entry 寫入 journal 後,entry 也會被新增到 ledger 儲存中。預設情況下,bookie 使用在 DbLedgerStorage 中的指定的值作為 ledger 儲存。DbLedgerStorage 是 ledger 儲存的一種實現形式,它使用 RocksDB 來儲存儲存在 entry log 中的 entry 索引。在 entry 成功寫入記憶體 table 後,ledger 儲存中新增 entry 的請求才會完成,然後是 bookie 客戶端的請求。記憶體 table 會定期 flush 到 entry log,併為儲存在 entry log 中的 entry 建立索引,也稱為 checkpoint。
Checkpoint 引入了很多隨機磁碟 I/O。如果 journal 目錄和 ledger 目錄分別位於不同裝置上,則 flush 不會影響效能。但是,如果 journal 目錄和 ledger 目錄位於同一裝置上,頻繁 flush 會導致效能顯著下降。可以考慮透過增加 bookie 的 flush 間隔來提升效能。但是,增加 flush 間隔後,重啟 bookie 時(例如,發生故障後),恢復所需時間會增加。
為實現最佳效能,記憶體 table 應該足夠大,因此可以在 flush 間隔期間儲存大量 entry。在 bookkeeper.conf 中設定寫入快取大小和 flush 間隔的引數如下:
3. 伺服器端讀取訊息Apache Pulsar 是一個支援追尾讀和追趕讀的多層系統。追尾讀指讀取最新寫入的資料,追趕讀指讀取歷史資料。Pulsar 透過不同方式實現追尾讀和追趕讀。
3.1 追尾讀追尾讀時,服務 broker 已將資料儲存在 managed ledger 快取中,consumer 從中讀取資料。過程如圖 6 所示。
圖 6. Consumer 如何從服務 Broker 進行追尾讀
在 broker.conf 中設定快取大小和快取逐出策略的引數如下:
3.2 追趕讀追趕讀需要從儲存層讀取資料。過程如圖 7 所示。
圖 7. 如何從儲存層進行追趕讀
Bookie 伺服器使用單執行緒處理從同一 ledger 讀取請求的 entry。Bookie 伺服器從與 ledger ID 相關的讀取 worker 執行緒池中選擇一個執行緒。在 bookkeeper.conf 中設定讀取 worker 執行緒池大小和單個執行緒中待讀取請求最大值的引數如下:
從 ledger 儲存中讀取 entry 時,bookie 首先透過索引檔案確認 entry 在 entry log 中的位置。 DbLedgerStorage 使用 RocksDB 來儲存 ledger entry 的索引。因此,需要確保分配的記憶體足以儲存索引資料庫的大部分資料,以避免索引 entry 的換入換出。
為實現最佳效能,RocksDB 塊快取需要足夠大以儲存索引資料庫的大部分資料,在一些情況下,這個值會達到約 2 GB。
在 bookkeeper.conf 中設定 RocksDB 塊快取大小的引數如下:
啟用 entry 預讀快取可以減少磁碟用於順序讀取的操作。在 bookkeeper.conf 中配置 entry 預讀快取大小的引數如下:
4. 元資料儲存最佳化Pulsar 使用 Apache® ZooKeeper 作為其預設元資料儲存區域。ZooKeeper 是一項集中式服務,可用於維護配置資訊、命名、提供分散式同步、提供組服務等。
關於 ZooKeeper 效能調優的更多詳細資訊,參閱 ZooKeeper admin 文件 [6] 。在該文件給出的建議中,建議詳細閱讀與磁碟 I/O 相關的部分。
5. 結語希望本文可以幫助大家更好地理解一些關於 Pulsar 的基本概念,尤其是 Pulsar 如何讀寫訊息。總結一下,本文主要介紹了以下幾點:
• 提高讀寫 I/O 隔離可以增加 bookies 的吞吐量,並減少延遲。 • 可以並行執行透過多個磁碟之間的 I/O 並行性最佳化 journal 和 ledger 的效能。 • 追尾讀時,broker 中的 entry 快取可以減少資源開銷,並避免與寫入競爭資源。 • 最佳化 ZooKeeper 的效能,最大程度地提高系統穩定性。
相關閱讀· Pulsar 讀寫過程的效能調優
· Apache Pulsar 在 BIGO 的效能調優實戰(上)
· Pulsar 和 Kafka 基準測試:Pulsar效能精準解析(完整版)
引用連結
[1] Pulsar 訊息流控制官方文件: http://pulsar.apache.org/docs/en/develop-binary-protocol/#flow-control
[2] LSM 樹設計 pdf 文件: https://www.cs.umb.edu/~poneil/lsmtree.pdf
[3] BookKeeper 概念: https://bookkeeper.apache.org/docs/4.10.0/getting-started/concepts
[4] Pulsar client library: http://pulsar.apache.org/docs/en/client-libraries/
[5] Pulsar 工作原理部落格: https://jack-vanlightly.com/blog/2018/10/2/understanding-how-apache-pulsar-works
[6] ZooKeeper admin 文件: https://zookeeper.apache.org/doc/r3.4.13/zookeeperAdmin.pdf
出處:https://mp.weixin.qq.com/s?__biz=MzUyMjkzMjA1Ng==&mid=2247486485&idx=1&sn=4b452c9e1f2adfeb08292e7d480ff733