首頁>技術>

原文地址:https://blog.dogchao.cn/?p=305

一個示意圖

Kafka存在丟訊息的問題,訊息丟失會發生在Broker,Producer和Consumer三種。

Broker

Broker丟失訊息是由於Kafka本身的原因造成的,kafka為了得到更高的效能和吞吐量,將資料非同步批次的儲存在磁碟中。訊息的刷盤過程,為了提高效能,減少刷盤次數,kafka採用了批次刷盤的做法。即,按照一定的訊息量,和時間間隔進行刷盤。這種機制也是由於linux作業系統決定的。將資料儲存到linux作業系統種,會先儲存到頁快取(Page cache)中,按照時間或者其他條件進行刷盤(從page cache到file),或者透過fsync命令強制刷盤。資料在page cache中時,如果系統掛掉,資料會丟失。

Broker在linux伺服器上高速讀寫以及同步到Replica

上圖簡述了broker寫資料以及同步的一個過程。broker寫資料只寫到PageCache中,而pageCache位於記憶體。這部分資料在斷電後是會丟失的。pageCache的資料透過linux的flusher程式進行刷盤。刷盤觸發條件有三:

主動呼叫sync或fsync函式可用記憶體低於閥值dirty data時間達到閥值。dirty是pagecache的一個標識位,當有資料寫入到pageCache時,pagecache被標註為dirty,資料刷盤以後,dirty標誌清除。

Broker配置刷盤機制,是透過呼叫fsync函式接管了刷盤動作。從單個Broker來看,pageCache的資料會丟失。

Kafka沒有提供同步刷盤的方式。同步刷盤在RocketMQ中有實現,實現原理是將非同步刷盤的流程進行阻塞,等待響應,類似ajax的callback或者是java的future。下面是一段rocketmq的原始碼。

GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());service.putRequest(request);boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盤

也就是說,理論上,要完全讓kafka保證單個broker不丟失訊息是做不到的,只能透過調整刷盤機制的引數緩解該情況。比如,減少刷盤間隔,減少刷盤資料量大小。時間越短,效能越差,可靠性越好(儘可能可靠)。這是一個選擇題。

為了解決該問題,kafka透過producer和broker協同處理單個broker丟失引數的情況。一旦producer發現broker訊息丟失,即可自動進行retry。除非retry次數超過閥值(可配置),訊息才會丟失。此時需要生產者客戶端手動處理該情況。那麼producer是如何檢測到資料丟失的呢?是透過ack機制,類似於http的三次握手的方式。

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

http://kafka.apache.org/20/documentation.html

以上的引用是kafka官方對於引數acks的解釋(在老版本中,該引數是request.required.acks)。

acks=0,producer不等待broker的響應,效率最高,但是訊息很可能會丟。acks=1,leader broker收到訊息後,不等待其他follower的響應,即返回ack。也可以理解為ack數為1。此時,如果follower還沒有收到leader同步的訊息leader就掛了,那麼訊息會丟失。按照上圖中的例子,如果leader收到訊息,成功寫入PageCache後,會返回ack,此時producer認為訊息傳送成功。但此時,按照上圖,資料還沒有被同步到follower。如果此時leader斷電,資料會丟失。acks=-1,leader broker收到訊息後,掛起,等待所有ISR列表中的follower返回結果後,再返回ack。-1等效與all。這種配置下,只有leader寫入資料到pagecache是不會返回ack的,還需要所有的ISR返回“成功”才會觸發ack。如果此時斷電,producer可以知道訊息沒有被髮送成功,將會重新發送。如果在follower收到資料以後,成功返回ack,leader斷電,資料將存在於原來的follower中。在重新選舉以後,新的leader會持有該部分資料。資料從leader同步到follower,需要2步:資料從pageCache被刷盤到disk。因為只有disk中的資料才能被同步到replica。資料同步到replica,並且replica成功將資料寫入PageCache。在producer得到ack後,哪怕是所有機器都停電,資料也至少會存在於leader的磁碟內。

上面第三點提到了ISR的列表的follower,需要配合另一個引數才能更好的保證ack的有效性。ISR是Broker維護的一個“可靠的follower列表”,in-sync Replica列表,broker的配置包含一個引數:min.insync.replicas。該引數表示ISR中最少的副本數。如果不設定該值,ISR中的follower列表可能為空。此時相當於acks=1。

如上圖中:

acks=0,總耗時f(t) = f(1)。acks=1,總耗時f(t) = f(1) + f(2)。acks=-1,總耗時f(t) = f(1) + max( f(A) , f(B) ) + f(2)。

效能依次遞減,可靠性依次升高。

Producer

Producer丟失訊息,發生在生產者客戶端。

為了提升效率,減少IO,producer在傳送資料時可以將多個請求進行合併後傳送。被合併的請求咋傳送一線快取在本地buffer中。快取的方式和前文提到的刷盤類似,producer可以將請求打包成“塊”或者按照時間間隔,將buffer中的資料發出。透過buffer我們可以將生產者改造為非同步的方式,而這可以提升我們的傳送效率。

但是,buffer中的資料就是危險的。在正常情況下,客戶端的非同步呼叫可以透過callback來處理訊息傳送失敗或者超時的情況,但是,一旦producer被非法的停止了,那麼buffer中的資料將丟失,broker將無法收到該部分資料。又或者,當Producer客戶端記憶體不夠時,如果採取的策略是丟棄訊息(另一種策略是block阻塞),訊息也會被丟失。抑或,訊息產生(非同步產生)過快,導致掛起執行緒過多,記憶體不足,導致程式崩潰,訊息丟失。

producer採取批次傳送的示意圖

非同步傳送訊息生產速度過快的示意圖

根據上圖,可以想到幾個解決的思路:

非同步傳送訊息改為同步傳送消。或者service產生訊息時,使用阻塞的執行緒池,並且執行緒數有一定上限。整體思路是控制訊息產生速度。擴大Buffer的容量配置。這種方式可以緩解該情況的出現,但不能杜絕。service不直接將訊息傳送到buffer(記憶體),而是將訊息寫到本地的磁碟中(資料庫或者檔案),由另一個(或少量)生產執行緒進行訊息傳送。相當於是在buffer和service之間又加了一層空間更加富裕的緩衝層。Consumer

Consumer消費訊息有下面幾個步驟:

接收訊息處理訊息反饋“處理完畢”(commited)

Consumer的消費方式主要分為兩種:

自動提交offset,Automatic Offset Committing手動提交offset,Manual Offset Control

Consumer自動提交的機制是根據一定的時間間隔,將收到的訊息進行commit。commit過程和消費訊息的過程是非同步的。也就是說,可能存在消費過程未成功(比如丟擲異常),commit訊息已經提交了。此時訊息就丟失了。

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");// 自動提交開關props.put("enable.auto.commit", "true");// 自動提交的時間間隔,此處是1sprops.put("auto.commit.interval.ms", "1000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));while (true) {// 呼叫poll後,1000ms後,訊息狀態會被改為 committedConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records)insertIntoDB(record); // 將訊息入庫,時間可能會超過1000ms}

上面的示例是自動提交的例子。如果此時,`insertIntoDB(record)`發生異常,訊息將會出現丟失。接下來是手動提交的例子:

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test");// 關閉自動提交,改為手動提交props.put("enable.auto.commit", "false");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {// 呼叫poll後,不會進行auto commitConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);// 所有訊息消費完畢以後,才進行commit操作consumer.commitSync();buffer.clear();}}

將提交型別改為手動以後,可以保證訊息“至少被消費一次”(at least once)。但此時可能出現重複消費的情況,重複消費不屬於本篇討論範圍。

上面兩個例子,是直接使用Consumer的High level API,客戶端對於offset等控制是透明的。也可以採用Low level API的方式,手動控制offset,也可以保證訊息不丟,不過會更加複雜。

try {while(running) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (TopicPartition partition : records.partitions()) {List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);for (ConsumerRecord<String, String> record : partitionRecords) {System.out.println(record.offset() + ": " + record.value());}long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();// 精確控制offsetconsumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));}}} finally {consumer.close();}

10
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 資料分析總是做不好?你可能需要這個思維框架