首頁>科技>

週日早上醒來,明媚的陽光從臥室的窗戶直射進來,久違的好天氣。穿好衣服我開始籌劃今天去哪裡轉轉。一週忙碌的工作幾乎沒有時間陪家人,今天該好好陪陪家人了。

當我起來收拾好一切準備出發的時候,我瞄了一眼手機,發現手機的郵箱裡有一份報警郵件,報警郵件顯示線上最近10分鐘流量有異常,而且是多個渠道。有突然有一種不祥的預感:線上kafka出問題了。我讓媳婦和孩子下樓在車裡等我,我趕緊打開了電腦,連上公司VPN檢視線上系統。果然不出意外,kafka已經積壓了幾千萬的資料。因為我們的業務分為實時資料和離線資料,實時資料是FileBeat負責收集日誌發到Kafka,然後我們這個業務系統消費Kafka統計資料,實時資料對於當前流量分析、預算控制、熔斷有非常重要的作用,如果實時資料異常,其它業務系統都會受到一定的影響。

定位到報警郵件是由於kafka訊息積壓而導致實時資料異常觸發的,我立馬連上了我們消費Kafka的業務系統(data-collect)。這是一個執行時間很長了的Java服務,它的作用就是實時消費kafka資料,然後經過一定的業務邏輯處理,將最終結果更新到mongodb中。進到伺服器以後,我發現這個服務已經處於假死狀態,最後一條日誌顯示系統發生了OOM,也就是伺服器記憶體爆了。

關於data-collect這個Java服務的核心邏輯我在這裡詳細說明一下。這個系統的程式碼是很早的一位同事寫的,因為早期我們的資料體量還不是很大,所以,他採用了一種簡單的處理方式。先消費資料,處理完成以後放到一個Map中,然後,啟動了一個每10s執行一次的定時任務,定時任務讀取Map資料更新到mongo中,然後清空Map(ConcurrentMap)。這樣做的優點是將消費Kafka的操作和入庫操作分開了,可以防止因為入庫時間太長而導致消費速度變慢,但是,這種做法有一個致命的缺點:記憶體不可控。如果定時任務因為Mongo操作時間太長而沒有及時清空Map,Map中會積累大量的資料,最終耗盡記憶體,系統發生OOM。這時候如果系統自啟了,也會丟失大量的資料。

其實,這個問題我很早有意識到,但是系統一直執行良好,沒有出現任何問題,我們認為在現有資料體量下它是安全的。而碰巧的是,就在前一天我們升級了Mongo的配置,mongo機器進行了一個主從切換。同時,有一些大表清理和TTL索引重建的工作還在mongo後臺執行。這就導致了我們操作mongo耗時的增加。進而導致了我們一直認為安全的系統出現了這個問題。

回到data-collect這個系統的設計上。可能有的同學會在這裡有個疑問,為什麼不直接消費出來就入庫操作呢?這裡我們有一個重要的處理邏輯:為了防止頻繁的更新mongo,我們會將消費出來的資料在記憶體中進行一個合併處理,你可以簡單的理解為一個Map,如果key存在,我們就進行++的操作最終操作mongo是$inc的操作,不是insert和update的操作。這也是我們需要一個ConcurrentMap的原因。也就是我們大概消費了1000萬條資料,但是最終我們處理完成以後只有10萬條資料,很多key相同的資料我們都進行了合併處理,這樣我們mongo的操作就減少了很多。

data-collect發生了OOM,我只能第一時間重啟,重啟以後,消費正常,系統開始有了資料。但是大概運行了幾分鐘以後,又發生了OOM。原因很簡單:kafka積壓的了大量的訊息,消費很快,但是非同步如mongo太耗時,所以導致資料全部擠壓在了這個Map記憶體中。看到這裡,我想只能動手改造程式碼了。改造的最終要達到的結果是:系統在不發生OOM的前提下,消費積壓在kafka中的資料,完成mongo操作。

改造的思路很簡單,就是幹掉定時任務。在消費kafka訊息中增加一個邏輯,每當消費訊息並且記憶體進行資料合併完成以後,我們判斷Map的大小,如果Map的大小超過我們設定的限制以後,開始觸發mongo操作。之前的程式碼mongo操作是單執行緒執行,為了提升mongo插入操作,我們開啟20個執行緒並行執行,所以我們這裡需要一個帶阻塞佇列的執行緒池。改造後的程式碼如下:

這裡是SpringBoot整合Kafka的消費程式碼。

這是記憶體處理完成以後入mongo的操作。因為我們的topic有20個分割槽,所以程式碼中的listenPartition0是多執行緒執行的。如果沒有synchronized的同步程式碼塊,那assembleyAdxTrafficVo方法就會多執行緒執行,這就會導致資料重複插入mongo,具體大家可以體會assembleyAdxTrafficVo方法的邏輯。

而插入mongo操作的用了執行緒池ExecutorService,注意這裡我們executorService物件的定義。

為什麼要自己定義一個阻塞佇列CustomeBlockQueue?這相比很多人碰到過這個問題,如果採用預設的阻塞佇列,例如:ArrayBlockingQueue,當佇列長度長度超過設定的值時,ArrayBlockingQueue會拒絕新的資料進入,並且丟擲異常,所以我們需要自己定義CustomeBlockQueue,並且重寫他的offer方法(BlockingQueue預設採用offer方法將元素增加到佇列),offer方法不會阻塞,put方法會阻塞,所以我們需要重寫offer方法,並且內部採用put方法實現。關於這一點,大家可以多嘗試。ArrayBlockingQueue和LinkedBlockingQueue都有很多坑等大家去踩。

按照上述程式碼處理完成上線以後,系統開始正常執行,kafka積壓的訊息也開始慢慢降低,系統趨於恢復正常,而這時已經是12點了,驚心動魄的2小時總算過去了,阿彌陀佛。

15
最新評論
  • 整治雙十一購物亂象,國家再次出手!該跟這些套路說再見了
  • 【科技英才】王豔:身為工程科研人而驕傲