kafka是一個分散式的釋出和訂閱的訊息系統。也就是訊息的釋出者把訊息進行分類,然後傳送到kafka上。而訂閱者去讀取也就是去消費一個特定型別的訊息。當然在這樣的一個系統中一般都會存在一個節點來做一箇中心的作用。
大致上來看我們需要構建一個ProducerRecord 一個物件。然後把這個物件傳送到對應的一個broker上。
首先一個ProducerRecord物件包含了topic,parition,key ,value。生產者需要把鍵值物件序列化成位元組資料,這樣才能夠在網上傳播。然後如果指定了分割槽,資料就會往分割槽寫,如果沒有指定就會根據這個key,把資料往指定的分割槽寫。在成功寫入之後,伺服器響應會發回一個RecordMetaData物件,包含記錄在分割槽裡面的偏移量。接下來,我們來看看 producer.send() 怎麼使用:
這裡,我們關注了傳送者的兩種傳送方式:同步和非同步。send方法會返回一個Future物件,如果使用這個物件的get方法,等到kafka的返回,那麼會阻塞在這裡。如果你不關注這個返回值得話,我們可以用完send就跑。然後使用回掉方法,特殊的關注一些異常就好了。
需要注意的是,訊息是被放進緩衝區中,然後使用單獨的執行緒傳送到服務端。我們會等到緩衝區到達一定大小統一發送,傳送者在傳送的時候會把屬於同一分割槽的訊息依次傳送,這樣分割槽裡面的訊息就是有序的。關於緩衝區這個是有具體的引數可以配置的。使用的時候搜尋一下就行。
最後,傳送到broker的訊息型別是位元組陣列,所以我們在傳送之前需要把key-value進行相應的序列化,kafka有自帶的一些序列化方法,假如你使用的是你自己定義類,那麼你可以自己寫序列化方法,也可以使用向thrift這樣來生成資料。
消費者消費它訂閱的那個topic的訊息。因為我們消費之後可能要做一些耗時的處理工作,所以一個消費者可能消費不過來這麼多訊息,導致訊息阻塞在broker上。但是Kafka是很好橫向擴充套件的,橫向擴充套件的意思是你多來幾個消費者來消費同一個分割槽不就好了。那麼這些消費同一個topic的消費者,就屬於一個消費者群組。topic下面訊息的最小單位應該是分割槽(partition),每個消費者消費的都是一個partition。如果說消費者的數量大於了partition的數量,那麼會有一些消費者,消費不到資料。另一個是,kafka能保證的訊息有序只有paritition這個級別了,不能保證topic級別的訊息有序性。
假如說生產者崩了一個那也就崩了,只不過是沒有訊息進來而已。但是消費者崩了的話,可能訊息就會堵在broker上,對整個業務造成影響。所以在消費的離開或者加入群組的時候。kafka會進行一項rebalance的操作,就是把分割槽在重新分配給消費者。所以就得知道在rebalance之前各個消費者消費patition的情況,得知道他們的offset。kafka 0.8 及之前都是把這個資訊放在Zookeeper上。但實際上Zookeeper並不能承受住大規模的讀寫。所以經常rebalance失敗,訊息堵住,影響業務。但是kafka 0.9及之後,consumer在消費的時候會把偏移資訊傳送給一個特殊的topic->__consumer_offsets。這樣broker就可以直接維護這些資訊了。下面我們來看一下如何使用kafka consumer:
poll()這個方法是很重要的,消費者在每次輪詢的時候都會檢查是否該提交偏移量了,如果是,那麼就會使用poll來提交一次偏移量到_consumer_offsets 這個topic上。當然了,提交的時機也是我們可以設定的配置。
當然,出了自動提交還有手動提交的api:commitSync 和 commitAsync。
還有兩個特殊的api:
這兩個方法,都是在consumer.subscribe的時候呼叫,這樣發生rebalance的時候,或者重新分配區之後和消費者讀取訊息之前被呼叫。可以執行這兩個方法裡面的操作。
最後,在另一個執行緒裡面呼叫consumer.wakeup() 就可以優雅的退出consumer.poll() 輪詢。
kafka是一個分散式的釋出和訂閱的訊息系統。也就是訊息的釋出者把訊息進行分類,然後傳送到kafka上。而訂閱者去讀取也就是去消費一個特定型別的訊息。當然在這樣的一個系統中一般都會存在一個節點來做一箇中心的作用。
大致上來看我們需要構建一個ProducerRecord 一個物件。然後把這個物件傳送到對應的一個broker上。
首先一個ProducerRecord物件包含了topic,parition,key ,value。生產者需要把鍵值物件序列化成位元組資料,這樣才能夠在網上傳播。然後如果指定了分割槽,資料就會往分割槽寫,如果沒有指定就會根據這個key,把資料往指定的分割槽寫。在成功寫入之後,伺服器響應會發回一個RecordMetaData物件,包含記錄在分割槽裡面的偏移量。接下來,我們來看看 producer.send() 怎麼使用:
這裡,我們關注了傳送者的兩種傳送方式:同步和非同步。send方法會返回一個Future物件,如果使用這個物件的get方法,等到kafka的返回,那麼會阻塞在這裡。如果你不關注這個返回值得話,我們可以用完send就跑。然後使用回掉方法,特殊的關注一些異常就好了。
需要注意的是,訊息是被放進緩衝區中,然後使用單獨的執行緒傳送到服務端。我們會等到緩衝區到達一定大小統一發送,傳送者在傳送的時候會把屬於同一分割槽的訊息依次傳送,這樣分割槽裡面的訊息就是有序的。關於緩衝區這個是有具體的引數可以配置的。使用的時候搜尋一下就行。
最後,傳送到broker的訊息型別是位元組陣列,所以我們在傳送之前需要把key-value進行相應的序列化,kafka有自帶的一些序列化方法,假如你使用的是你自己定義類,那麼你可以自己寫序列化方法,也可以使用向thrift這樣來生成資料。
消費者消費者消費它訂閱的那個topic的訊息。因為我們消費之後可能要做一些耗時的處理工作,所以一個消費者可能消費不過來這麼多訊息,導致訊息阻塞在broker上。但是Kafka是很好橫向擴充套件的,橫向擴充套件的意思是你多來幾個消費者來消費同一個分割槽不就好了。那麼這些消費同一個topic的消費者,就屬於一個消費者群組。topic下面訊息的最小單位應該是分割槽(partition),每個消費者消費的都是一個partition。如果說消費者的數量大於了partition的數量,那麼會有一些消費者,消費不到資料。另一個是,kafka能保證的訊息有序只有paritition這個級別了,不能保證topic級別的訊息有序性。
假如說生產者崩了一個那也就崩了,只不過是沒有訊息進來而已。但是消費者崩了的話,可能訊息就會堵在broker上,對整個業務造成影響。所以在消費的離開或者加入群組的時候。kafka會進行一項rebalance的操作,就是把分割槽在重新分配給消費者。所以就得知道在rebalance之前各個消費者消費patition的情況,得知道他們的offset。kafka 0.8 及之前都是把這個資訊放在Zookeeper上。但實際上Zookeeper並不能承受住大規模的讀寫。所以經常rebalance失敗,訊息堵住,影響業務。但是kafka 0.9及之後,consumer在消費的時候會把偏移資訊傳送給一個特殊的topic->__consumer_offsets。這樣broker就可以直接維護這些資訊了。下面我們來看一下如何使用kafka consumer:
poll()這個方法是很重要的,消費者在每次輪詢的時候都會檢查是否該提交偏移量了,如果是,那麼就會使用poll來提交一次偏移量到_consumer_offsets 這個topic上。當然了,提交的時機也是我們可以設定的配置。
當然,出了自動提交還有手動提交的api:commitSync 和 commitAsync。
還有兩個特殊的api:
public void onPartitionsRevoked(Collection partitions)public void onPartitionsAssigned(Collection partitions)這兩個方法,都是在consumer.subscribe的時候呼叫,這樣發生rebalance的時候,或者重新分配區之後和消費者讀取訊息之前被呼叫。可以執行這兩個方法裡面的操作。
最後,在另一個執行緒裡面呼叫consumer.wakeup() 就可以優雅的退出consumer.poll() 輪詢。