首頁>技術>

導語

Flink 提供了靈活豐富的狀態管理,可輕鬆解決資料之間的關聯性。本文介紹了Flink 狀態(State)管理在推薦場景中的應用,大家結合自己的應用場景與業務邏輯,選擇合適的狀態管理。

背景

Flink作為純流式大資料實時計算引擎,較於Spark Streaming的微批處理引擎,不管是記憶體管理,多流合併,還是時間視窗,迭代處理上,Flink在實時計算場景更較適合。而Flink的State狀態管理,更是讓Flink在實時計算領域,更勝一籌。通過對Flink State狀態的靈活妙用,可以完美實現大資料下的實時數倉,實時畫像和實時資料監控等功能。

場景

最近在做推薦資料平臺,其中有一個場景需求是要實時統計最近1分鐘的UV、點選量、真實曝光量和下發量等熱點資料,並可以在不同地域維度下做多維度查詢。通過對資料的實時跟蹤監控,可以精準迅速地獲悉推薦演算法在不同地域投放後所產生的流量變化,從而優化對不同地域下使用者的精準推薦。

問題&選型

我們在做場景分析的時候,發現有兩個問題需要解決。

首先是我們的資料來自於使用者對App的操作行為日誌,在這些埋點資料裡,有個欄位localId(13位數字組成),該欄位記錄了該使用者所在的位置編號,可以精確到區,街道,甚至村委會,但是缺少上下層級關係。也就是說,通過localId我們無法得知該使用者屬於哪個鎮,哪個市,哪個省等。所以,在對該資料做進一步操作前,需要找到localId的地域對映關係,並關聯到省市縣等,從而實現省市縣下不同維度下的熱點資料統計與分析。

另外就是對於埋點上報的真實曝光資料,存在較嚴重的資料延遲問題,甚至可達到數個小時的延遲,嚴重影響資料的準確性和時效性。這部分的原因是真實曝光的定義與App客戶端的埋點上報機制所致。

結合上述問題,在構思方案的選型上:

首先想到使用Spark做資料處理引擎,不管從公司使用的人數和任務數上,還有維護上,使用Spark無疑是最穩定的選擇。但是Spark是基於RDD做的micro batch處理,而Spark Streaming又只是在Spark RDD基礎上增加了時間維度(時間片),其本質還是在進行Spark的RDD處理。Spark Streaming將流式計算分解成了多個Spark Job,而每一段資料的處理都會經過RDD DAG有向無環圖的分解,和Spark Scheduler的排程分配,其最小的Batch Size為0.5秒~2秒鐘之間。所以,Spark Streaming適用於對實時性要求不是非常高的流式準實時計算場景。而我們的資料有一部分是實時上報上來,例如點選與下發資料。我們希望對這些資料做秒級內處理,所以在處理這種延時性較低的資料上,Spark Streaming可能不是很適合。並且,一天不同時間段訪問UV量參差不齊,導致一天內不同時間段的流量峰值或高或低,而且節假日期間的流量更是不可預測。Spark Streaming從v1.5才開始引進反壓機制(back-pressure),而且也只是估計當前系統處理資料的速率,再調節系統接收資料的速率與之匹配,無法實現動態反壓。而Spark在流式計算上的缺陷,正是Flink的優勢。與Spark基於RDD計算不同,Flink是基於有狀態流的計算,並提供了更豐富靈活的狀態用以儲存狀態資料,我們就用到了Flink Stream的Broadcast State解決了localId的地域對映關係。並且Flink對資料流也是逐條處理,在低延時上明顯優於Spark Streaming。Flink在1.5之後,採用Credit-based的網路流控制機制,對執行時的Task有著天然流控,慢的資料sink節點會反壓快的資料來源source。系統能接收資料的前提是接收資料的Task必須有空閒可用的Buffer,而資料被繼續處理的前提是下游Task也有空閒可用的Buffer,只有下游的Task有了空閒的Buffer,才能消費上游Task的Buffer。所以,Flink的反壓,是系統接收資料的速率和處理資料速率的自然匹配。

其次,我們在專門基於流處理框架Storm和Flink中做了比較,雖然Storm在延遲處理上優於Flink,但從吞吐量,資源動態調整,SQL支援,狀態管理,容錯機制和社群活躍度等來看,Flink都明顯優於Storm。特別是Storm沒有任何對狀態的支援,需要依賴其他元件實現狀態管理。最重要的,Flink在公司內部有專門的WStream平臺,並由專業的團隊維護。

所以,我們選擇了Flink做資料流處理框架,而基於真實曝光資料上報延遲較嚴重問題,我們選擇Druid這種時序性的資料庫作為資料儲存,在保證資料不丟失的前提下,還能做到資料的近實時聚合查詢。

機制原理

Spark有廣播變數,把地域對映表資料直接Broadcast共享到到各個Worker Node的記憶體中,接下來的Operator操作都可以基於各自所在Worker節點已拷貝到記憶體中僅一份的地域對映表資料進行操作。Flink Datastreaming也有類似Spark的Broadcast廣播變數,都實現了節約記憶體和共享變數的作用。但其機制原理與使用方法與Spark 廣播變數截然不同。Flink Streaming的Broadcast作為Flink State的一種,類似Hadoop的分散式快取,Flink會複製檔案或者目錄到所有Worker Node的本地檔案系統,讓並行執行例項的函式可以本地訪問。為保證每個節點獲取到的Broadcast State一致,Worker Node中的Broadcast State並不會相互傳播通訊,也不會被修改,且同一個Worker Node的所有Task可以共享廣播狀態。這個功能被常用來快取不大且不可變的靜態資料,例如地域對映表或者機器學習的邏輯迴歸模型等。而在使用方法上,Flink DataStreaming需要定義StateDescriptor來廣播狀態到各個Worker Node。而每個Task在處理資料時,通過StateDescriptor就可以獲取快取在本Worker Node的廣播狀態,相對Spark 廣播變數API的使用較複雜一些。

不僅有Broadcast State,根據資料集是否按照Key分割槽,Flink可以將狀態分為Keyed State和Operator State(No-Keyed State)兩種型別,而這兩種狀態型別又均具有兩種形式,分別是託管狀態(Managed State)和原生狀態(Raw State)。區別在於託管狀態(Managed State)是由Flink Runtime控制和管理的狀態資料,並將狀態資料轉化儲存在Java Heap記憶體的Hash Table或RocksDB,然後將這些狀態資料通過內部的CheckpointedFunction介面持久化到Checkpoints中,而狀態的一致性,其實也是通過Checkpoints實現。因為有第三方RocksDB資料庫的參與,可以把State資料暫存RocksDB資料庫中,相比存於Java Heap中更安全。如果開啟Checkpoint增量機制,新產生的資料會替換之前產生的檔案COPY到持久化中,以此減少COPY的資料量,並提高效能,更適合在生產環境使用。但目前為止,RocksDB還不支援Broadcast State。

當任務出現異常退出時,也可以通過這些狀態資料進行恢復,讀取已經Checkpoints的狀態資料,可以還原任務失敗前的狀態,包括記錄已經消費過的Kafka偏移量,以此實現容錯機制。當任務從狀態資料恢復時,可以繼續從未消費的Kafka偏移量開始讀取資料,從而實現Flink Source端Exactly-Once語義。

原生狀態(Raw State)由運算元自己管理資料結構,當觸發Checkpoint過程時,只是將資料轉化成位元組碼資料存在Checkpoints中,當從Checkpoints恢復任務時,運算元再自己反序列化出狀態的資料結構,常用於自定義運算元操作中。雖然兩者都可以實現狀態的管理和儲存,但託管狀態可以更好地支援狀態資料的重平衡以及更加完善的記憶體管理,經常被使用。

託管狀態(Managed State)已經有了官方實現好的幾種狀態,可以根據實際場景與業務邏輯選擇使用,例如BroadcastState<K,V>,ValueState[T],ListState[T],ReducingState[T],AggregatingState[IN,OUT],MapState[UK,UV],而所有的託管狀態都需要通過建立StateDescriptor來獲取響應的State的操作類,該描述符主要定義了狀態的名稱,狀態中的資料型別引數資訊以及狀態自定義函式,方便Flink的序列化與反序列化。每種託管狀態都有對應的描述符,例如ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor,MapStateDescriptor等。而且通過對Flink State設定TTL,自定義超時清理時間,在狀態量越來越大的情形,可對記憶體做著優化處理的同時,還不會影響效能。

資料流程處理

我們的資料處理是基於Lambda架構搭建,資料暫存於Kafka訊息佇列,分流供離線與實時做不同處理。

Flink在處理資料流程上層次分明,Source(資料來源)->Channel(資料處理)->Sink(資料儲存)。

首先,Flink Source端接入已經存入Kafka訊息佇列的資料,這裡面的資料可以是經過ETL處理後的JSON格式資料。

其次,使用Flink對接入的資料做operator操作,並將處理結果Sink暫存到另一個Kafka訊息佇列,供Druid做Pull拉取操作。

最後,基於Druid做資料視覺化操作或供其他介面呼叫。

下面是針對具體場景的程式碼實現:

首先,讀取存於HDFS的地域表資料,並把表資料轉換成自定義LocalContainerJava物件流。

其次,定義MapStateDescriptor描述符,並把轉換的表資料物件流廣播出去,描述符必須和表資料物件流格式相對應。

然後,使用connect將BroadcastStream 與Kafka傳來的資料流合併,因為這兩種流資料型別不同,所以使用connect多流合併,返回BroadcastConnectedStream。Flink的多流合併還有union和join等,union要求連線的流資料型別必須一致,join則要求每個流必須有key且key值相同才能完成關聯操作。之後呼叫Process方法分別對各流做具體邏輯操作。因為我們的場景並不需要對資料集做Key分割槽,所以在Process方法裡傳入抽象類BroadcastProcessFunction的引數實現。

在抽象類BroadcastProcessFunction裡,這裡必須重寫兩個方法,一個是processBroadcastElement,一個是processElement,前者用以處理廣播流,後者用以直接處理從Kafka讀取的資料流,當然也可以重寫 open和close方法,做一些初始化與收尾操作,具體根據自己的應用場景的需要來決定。

在processBroadcastElement方法,通過上下文Context傳入之前定義Broadcast State的描述符,獲取到BroadcastState操作類,並對BroadcastState做put鍵值對操作。這裡的put操作並不會影響其他TaskManager節點的Broadcast State資料,只會作用於當前節點。這點類似定義本地全域性HashMap,只是這裡TaskManager把這些狀態資料轉換成記憶體Hash Table儲存,並Checkpoint到JobManager,最後JobManager根據配置資訊setStateBackend儲存Checkpoints資料。這裡,Flink RunTime幫我們做了內部具體實現,讓我們可以只關注具體業務邏輯,而不必考慮資料在節點間的傳輸和序列化等問題。

而processElement方法,可以對MapState進行各種操作。類似HashMap介面,MapState可以通過entries(),keys(),values()獲取對應的keys或values的集合。然後把結果集collect到下游運算元。

最後,把結果資料集做Sink處理。這裡可以把Flink做完的聚合統計結果,直接存入第三方儲存裡(例如Kafka,Redis甚至Mysql等),只要把Sink定義成相應的connector即可。

這裡,我們沒有使用Flink做聚合操作的原因,從Kafka傳來的部分資料,不可避免出現延遲時間問題,甚至有些資料延遲達數個小時以上。Flink可以解決亂序問題,但是對於延時過長的資料,藉助其他大資料元件是更好的選擇。同時也是因為部分資料資料延時過長的原因,我們使用Flink預設的ProcessTime,以Flink處理時間為準。因為不需要Flink做聚合操作,所以也就沒有自定義Window。

最後把資料傳入第三方大資料元件Druid,我們在Druid裡做聚合查詢操作。而Druid使用的是資料的EventTime,通過把資料存入Druid時序資料庫的不同時間Segment,就解決了資料延遲時間參差不齊的問題,實時性和效能都有提高。隨著時間的推移,遲到資料陸續存入不同時間的Segment,準確度越來越高,遲到的資料會不斷更新最後的結果,解決埋點資料上報的延遲問題。

經驗總結

無論是在實時還是離線場景,資料之間難免會有關聯。Flink 提供了靈活豐富的狀態管理,可輕鬆解決資料之間的關聯性。結合自己的應用場景與業務邏輯,選擇合適的狀態管理。

Flink雖然可以處理大多數實時計算場景,但對某些特殊場景,可能並不是特別適合,或者處理起來較複雜。如果可以參考其他大資料元件,與Flink相互結合使用,無論從程式碼開發,還是預期效果上,可能會事半功倍。

參考文獻

Apache Flink官方文件:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/state.html

作者簡介

代鬆辰,高階大資料開發工程師,現就職於58同鎮演算法技術部。

最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 265分發平臺APP企業簽名系統PHP網站原始碼