部門:資料中臺
出處:https://mp.weixin.qq.com/s?__biz=MzAxOTY5MDMxNA==&mid=2455762076&idx=1&sn=dc56d09959289c76808f31a00756a2b7
背景隨著 Flink k8s 化以及實時叢集遷移完成,有贊越來越多的 Flink 實時任務執行在 K8s 叢集上,Flink k8s 化提升了實時叢集在大促時彈性擴縮容能力,更好的降低大促期間機器擴縮容的成本。同時,由於 K8s 在公司內部有專門的團隊進行維護,Flink k8s 化也能夠更好的減低公司的運維成本。
不過當前 Flink k8s 任務資源是使用者在實時平臺端進行配置,使用者本身對於實時任務具體配置多少資源經驗較少,所以存在使用者資源配置較多,但實際使用不到的情形。比如一個 Flink 任務實際上 4 個併發能夠滿足業務處理需求,結果使用者配置了 16 個併發。這種情況會導致實時計算資源的浪費,從而對於實時叢集資源水位以及底層機器成本,都有一定影響。基於這樣的背景,本文從 Flink 任務記憶體以及訊息能力處理方面,對 Flink 任務資源最佳化進行探索與實踐。
一、Flink 計算資源型別與最佳化思路1.1 Flink 計算資源型別一個 Flink 任務的執行,所需要的資源我認為能夠分為 5 類:
記憶體資源本地磁碟(或雲盤)儲存依賴的外部儲存資源。比如 HDFS、S3等(任務狀態/資料),HBase,Mysql,Redis等(資料)CPU 資源網絡卡資源目前 Flink 任務使用最主要的還是記憶體和 CPU 資源,本地磁碟、依賴的外部儲存資源以及網絡卡資源一般都不會是瓶頸,所以本文我們是從 Flink 任務的記憶體和 CPU 資源,兩個方面來對 Flink 實時任務資源進行最佳化。
1.2 Flink 實時任務資源最佳化思路對於 Flink 實時任務資源分析思路,我們認為主要包含兩點:一是從任務記憶體視角,從堆記憶體方面對實時任務進行分析,另一方面則是從實時任務訊息處理能力入手,保證滿足業務方資料處理需求的同時,儘可能合理使用 CPU 資源。之後再結合實時任務記憶體分析所得相關指標、實時任務併發度的合理性,得出一個實時任務資源預設值,在和業務方充分溝通後,調整實時任務資源,最終達到實時任務資源配置合理化的目的,從而更好的降低機器使用成本。
1.2.1 任務記憶體視角
那麼如何分析 Flink 任務的堆記憶體呢?這裡我們是結合 Flink 任務 GC 日誌來進行分析,GC 日誌包含了每次 GC 堆內不同區域記憶體的變化和使用情況。同時根據 GC 日誌,也能夠獲取到一個 Taskmanager 每次 Full GC 後,老年代剩餘空間大小。可以說,獲取實時任務的 GC 日誌,使我們進行實時任務記憶體分析的前提。
GC 日誌內容分析,這裡我們藉助開源的 GC Viewer 工具來進行具體分析,每次分析完,我們能夠獲取到 GC 相關指標,下面是透過 GC Viewer 分析一次 GC 日誌的部分結果:
上面透過 GC 日誌分析出單個 Flink Taskmanager 堆總大小、年輕代、老年代分配的記憶體空間、Full GC 後老年代剩餘大小等,當然還有很多其他指標,相關指標定義可以去 Github 具體檢視。
這裡最重要的還是Full GC 後老年代剩餘大小這個指標,按照《Java 效能最佳化權威指南》這本書 Java 堆大小計算法則,設 Full GC 後老年代剩餘大小空間為 M,那麼堆的大小建議 3 ~ 4倍 M,新生代為 1 ~ 1.5 倍 M,老年代應為 2 ~ 3 倍 M,當然,真實對記憶體配置,你可以按照實際情況,將相應比例再調大些,用以防止流量暴漲情形。
所以透過 Flink 任務的 GC 日誌,我們可以計算出實時任務推薦的堆記憶體總大小,當發現推薦的堆記憶體和實際實時任務的堆記憶體大小相差過大時,我們就認為能夠去降低業務方實時任務的記憶體配置,從而降低機器記憶體資源的使用。
1.2.2 任務訊息處理能力視角
對於 Flink 任務訊息處理能力分析,我們主要是看實時任務消費的資料來源單位時間的輸入,和實時任務各個 Operator / Task 訊息處理能力是否匹配。Operator 是 Flink 任務的一個運算元,Task 則是一個或者多個運算元 Chain 起來後,一起執行的物理載體。
資料來源我們內部一般使用 Kafka,Kafka Topic 的單位時間輸入可以透過呼叫 Kafka Broker JMX 指標介面進行獲取,當然你也可以呼叫 Flink Rest Monitoring 相關 API 獲取實時任務所有 Kafka Source Task 單位時間輸入,然後相加即可。不過由於反壓可能會對 Source 端的輸入有影響,這裡我們是直接使用 Kafka Broker 指標 JMX 介面獲取 Kafka Topic 單位時間輸入。
在獲取到實時任務 Kafka Topic 單位時間輸入後,下面就是判斷實時任務的訊息處理能力是否與資料來源輸入匹配。一個實時任務整體的訊息處理能力,會受到處理最慢的 Operator / Task 的影響。打個比方,Flink 任務消費的 Kafka Topic 輸入為 20000 Record / S,但是有一個 Map 運算元,其併發度為 10 ,Map 運算元中業務方呼叫了 Dubbo,一個 Dubbo 介面從請求到返回為 10 ms,那麼 Map 運算元處理能力 1000 Record / S (1000 ms / 10 ms * 10 ),從而實時任務處理能力會下降為 1000 Record / S。
由於一條訊息記錄的處理會在一個 Task 內部流轉,所以我們試圖找出一個實時任務中,處理最慢的 Task 邏輯。如果 Source 端到 Sink 端全部 Chain 起來的話,我們則是會找出處理最慢的 Operator 的邏輯。在原始碼層,我們針對 Flink Task 以及 Operator 增加了單條記錄處理時間的自定義 Metric,之後該 Metric 可以透過 Flink Rest API 獲取。我們會遍歷一個 Flink 任務中所有的 Task , 查詢處理最慢的 Task 所在的 JobVertex(JobGraph 的點),然後獲取到該 JobVertex 所有 Task 的總輸出,最終會和 Kafka Topic 單位時間輸入進行比對,判斷實時任務訊息處理能力是否合理。
設實時任務 Kafka Topic 單位時間的輸入為 S,處理最慢的 Task 代表的 JobVertex 的併發度為 P,處理最慢的 Task 所在的 JobVertex 單位時間輸出為 O,處理最慢的 Task 的最大訊息處理時間為 T,那麼透過下面邏輯進行分析:
當 O 約等於 S,且 1 second / T * P 遠大於 S 時,會考慮減小任務併發度。當 O 約等於 S,且 1 second / T * P 約等於 S 時,不考慮調整任務併發度。當 O 遠小於 S,且 1 second / T * P 遠小於 S 時,會考慮增加任務併發度。目前主要是 1 這種情況在 CPU 使用方面不合理,當然,由於不同時間段,實時任務的流量不同,所以我們會有一個週期性檢測的的任務,如果檢測到某個實時任務連續多次都符合 1 這種情況時,會自動報警提示平臺管理員進行資源最佳化調整。
下圖是從 Flink 任務的記憶體以及訊息處理能力兩個視角分析資源邏輯圖:
二、從記憶體視角對 Flink 分析實踐2.1 Flink 任務垃圾回收器選擇Flink 任務本質還是一個 Java 任務,所以也就會涉及到垃圾回收器的選擇。選擇垃圾回收器一般需要從兩個角度進行參考:
吞吐量,即單位時間內,任務執行時間 / (任務執行時間 + 垃圾回收時間),當然並不是說降低 GC 停頓時間就能提升吞吐量,因為降低 GC 停頓時間,你的 GC 次數也會上升。延遲。如果你的 Java 程式涉及到與外部互動,延遲會影響外部的請求使用體驗。Flink 任務我認為還是偏重吞吐量的一類 Java 任務,所以會從吞吐量角度進行更多的考量。當然並不是說完全不考慮延遲,畢竟 JobManager、TaskManager、ResourceManager 之間存在心跳,延遲過大,可能會有心跳超時的可能性。
目前我們 JDK 版本為內部 JDK 1.8 版本,新生代垃圾回收器使用 Parallel Scavenge,那麼老年代垃圾回收器只能從 Serial Old 或者 Parallel Old。由於我們 Flink k8s 任務每個 Pod 的 CPU 限制為 0.6 - 1 core ,最大也只能使用 1 個 core,所以老年代的垃圾回收器我們使用的是 Serial Old ,多執行緒垃圾回收在單 Core 之間,可能會有執行緒切換的消耗。
2.2 實時任務 GC 日誌獲取設定完垃圾回收器後,下一步就是獲取 Flink 任務的 GC 日誌。Flink 任務構成一般是單個 JobManager + 多個 TaskManger ,這裡需要獲取到 TaskManager 的 GC 日誌進行分析。那是不是要對所有 TaskManager 進行獲取呢。這裡我們按照 TaskManager 的 Young GC 次數,按照次數大小進行排序,取排名前 16 的 TaskManager 進行分析。YoungGC 次數可以透過 Flink Rest API 進行獲取。
Flink on Yarn 的實時任務,直接點開 TaskManager 的日誌連結就能夠看到,然後透過 HTTP 訪問,就能下載到本地。Flink On k8s 任務的 GC 日誌,會先寫到 Pod 所掛載的雲盤,基於 k8s hostpath volume 進行掛載。我們內部使用 Filebeat 進行日誌檔案變更監聽和採集,最終輸出到下游的 Kafka Topic。我們內部會有自定義日誌服務端,它會消費 Kafka 的日誌記錄,自動進行落盤和管理,同時向外提供日誌下載介面。透過日誌下載的介面,便能夠下載到需要分析的 TaskManager 的 GC 日誌。
2.3 基於 GC Viewer 分析 Flink 任務記憶體GC Viewer 是一個開源的 GC 日誌分析工具。使用 GC Viewer 之前,需要先把 GC Viewer 專案程式碼 clone 到本地,然後進行編譯打包,就可以使用其功能。
在對一個實時任務堆記憶體進行分析時,先把 Flink TaskManager 的日誌下載到本地,然後透過 GC Viewer 對日誌進行。如果你覺得多個 Taskmanager GC 日誌分析較慢時,可以使用多執行緒。上面所有這些操作,可以將其程式碼化,自動化產出分析結果。下面是透過 GC Viewer 分析的命令列,
java -jar gcviewer-1.37-SNAPSHOT.jar gc.log summary.csv
上面引數 gc.log 表示一個 Taskmanager 的 GC 日誌檔名稱,summary.csv 表示日誌分析的結果。下面是我們平臺對於某個實時任務記憶體分析的結果:
下面是上面截圖中,部分引數說明:
RunHours,Flink 任務執行小時數YGSize,一個 TaskManager 新生代堆記憶體最大分配量,單位兆YGUsePC,一個 TaskManager 新生代堆最大使用率OGSize,一個 TaskManager 老年代堆記憶體最大分配量,單位兆OGUsePC,一個 TaskManager 老生代堆最大使用率YGCoun,一個 TaskMnager Young GC 次數YGPerTime,一個 TaskMnager Young GC 每次停頓時間,單位秒FGCount,一個 TaskMnager Full GC 次數FGAllTime,一個 TaskMnager Full GC 總時間,單位秒Throught,Task Manager 吞吐量AVG PT(分析結果 avgPromotion 引數),平均每次 Young GC 晉升到老年代的物件大小Rec Heap,推薦的堆大小RecNewHeap,推薦的新生代堆大小RecOldHeap,推薦的老年代堆大小上述大部分記憶體分析結果,透過 GC Viewer 分析都能得到,不過推薦堆大小、推薦新生代堆大小、推薦老年代堆大小則是根據 1.2.1 小節的記憶體最佳化規則來設定。
三、從訊息處理視角對 Flink 分析實踐3.1 實時任務 Kafka Topic 單位時間輸入獲取想要對 Flink 任務的訊息處理能力進行分析,第一步便是獲取該實時任務的 Kafka 資料來源 Topic,目前如果資料來源不是 Kafka 的話,我們不會進行分析。Flink 任務總體分為兩類:Flink Jar 任務和 Flink SQL 任務。Flink SQL 任務獲取 Kafka 資料來源比較簡單,直接解析 Flink SQL 程式碼,然後獲取到 With 後面的引數,在過濾掉 Sink 表之後,如果 SQLCreateTable 的 Conector 型別為 Kafka,就能夠透過 SQLCreateTable with 後的引數,拿到具體 Kafka Topic。
Flink Jar 任務的 Kafka Topic 資料來源獲取相對繁瑣一些,我們內部有一個實時任務血緣解析服務,透過對 Flink Jar 任務自動構建其 PackagedProgram,PackagedProgram 是 Flink 內部的一個類,然後透過 PackagedProgram ,我們可以獲取一個 Flink Jar 任務的 StreamGraph,StreamGraph 裡面有 Source 和 Sink 的所有 StreamNode,透過反射,我們可以獲取 StreamNode 裡面具體的 Source Function,如果是 Kafka Source Sunction,我們就會獲取其 Kafka Topic。下面是 StreamGraph 類截圖:
獲取到 Flink 任務的 Kafka Topic 資料來源之後,下一步便是獲取該 Topic 單位時間輸入的訊息記錄數,這裡可以透過 Kafka Broker JMX Metric 介面獲取,我們則是透過內部 Kafka 管理平臺提供的外部介面進行獲取。
3.2 自動化檢測 Flink 訊息處理最慢 Task首先,我們在原始碼層增加了 Flink Task 單條記錄處理時間的 Metric,這個 Metric 可以透過 Flink Rest API 獲取。接下來就是藉助 Flink Rest API,遍歷要分析的 Flink 任務的所有的 Task。Flink Rest Api 有這樣一個介面:
base_flink_web_ui_url/jobs/:jobid
這個介面能夠獲取一個任務的所有 Vertexs,一個 Vertex 可以簡單理解為 Flink 任務 JobGraph 裡面的一個 JobVertex。JobVertex 代表著實時任務中一段執行邏輯。
獲取完 Flink 任務所有的 Vertex 之後,接下來就是獲取每個 Vertex 具體 Task 處理單條記錄的 metric,可以使用下面的介面:
需要在上述 Rest API 連結 metrics 之後新增 ?get=(具體meitric ),比如:metrics?get=0.Filter.numRecordsOut,0 表示該 Vertex Task 的 id,Filter.numRecordsOut 則表示具體的指標名稱。我們內部使用 taskOneRecordDealTime 表示Task 處理單條記錄時間 Metric,然後用 0.taskOneRecordDealTime 去獲取某個 Task 的單條記錄處理時間的指標。上面介面支援多個指標查詢,即 get 後面使用逗號隔開即可。
最終自動化檢測 Flink 訊息處理最慢 Task 整體步驟如下:
獲取一個實時任務所有的 Vertexs遍歷每個 Vertex,然後獲取這個 Vertex 所有併發度 Task 的 taskOneRecordDealTime,並且記錄其最大值所有 Vertex 單條記錄處理 Metric 最大值進行對比,找出處理時間最慢的 Vertex。下面是我們實時平臺對於一個 Flink 實時任務分析的結果:
四、有贊 Flink 實時任務資源最佳化實踐既然 Flink 任務的記憶體以及訊息處理能力分析的方式已經有了,那接下來就是在實時平臺端進行具體實踐。我們實時平臺每天會定時掃描所有正在執行的 Flink 任務,在任務記憶體方面,我們能夠結合 實時任務 GC 日誌,同時根據記憶體最佳化規則,計算出 Flink 任務推薦的堆記憶體大小,並與實際分配的 Flink 任務的堆記憶體進行比較,如果兩者相差的倍數過大時,我們認為 Flink 任務的記憶體配置存在浪費的情況,接下來我們會報警提示到平臺管理員進行最佳化。
平臺管理員再收到報警提示後,同時也會判定實時任務訊息能力是否合理,如果訊息處理最慢的 Vertex (某段實時邏輯),其所有 Task 單位時間處理訊息記錄數的總和約等於實時任務消費的 Kafka Topic 單位時間的輸入,但透過 Vertex 的併發度,以及單條訊息處理 Metric ,算出該 Vertex 單位時間處理的訊息記錄數遠大於 Kafka Topic 的單位輸入時,則認為 Flink 任務可以適當調小併發度。具體調整多少,會和業務方溝通之後,在進行調整。整體 Flink 任務資源最佳化操作流程如下:
五、總結目前有贊實時計算平臺對於 Flink 任務資源最佳化探索已經走出第一步。透過自動化發現能夠最佳化的實時任務,然後平臺管理員介入分析,最終判斷是否能夠調整 Flink 任務的資源。在整個實時任務資源最佳化的鏈路中,目前還是不夠自動化,因為在後半段還需要人為因素。未來我們計劃 Flink 任務資源的最佳化全部自動化,會結合實時任務歷史不同時段的資源使用情況,自動化推測和調整實時任務的資源配置,從而達到提升整個實時叢集資源利用率的目的。
同時未來也會和元資料平臺的同學進行合作,一起從更多方面來分析實時任務是否存在資源最佳化的可能性,他們在原來離線任務資源方面積攢了很多最佳化經驗,未來也可以參考和借鑑,應用到實時任務資源的最佳化中。
拓展閱讀:Flink 滑動視窗最佳化有贊埋點實踐有贊埋點質量保障基於時間加權的使用者購買類目意願計算有贊推薦系統關鍵技術有贊資料中臺建設實踐資料資產,贊之治理SparkSQL在有贊大資料的實踐(二)HBase Bulkload 實踐探討作者: 沈磊
出處:https://mp.weixin.qq.com/s?__biz=MzAxOTY5MDMxNA==&mid=2455762076&idx=1&sn=dc56d09959289c76808f31a00756a2b7