本文講述 Flink 在 Shopee 新加坡資料組 ( Shopee Singapore Data Team ) 的應用實踐,主要內容包括:
實時數倉建設背景Flink 在實時資料數倉建設中結合 Druid、Hive 的應用場景實時任務監控Streaming SQL 平臺化Streaming Job 管理未來規劃優化方向建設背景Shopee 是東南亞與臺灣領航電商平臺,覆蓋新加坡、馬來西亞、菲律賓、臺灣、印度尼西亞、泰國及越南七大市場,同時在中國深圳、上海和香港設立跨境業務辦公室。
Shopee 在2020年第一季的總訂單量高達4.298億,同比增長111.2%。根據 App Annie, Shopee 在2020年第一季強勢躋身全球購物類 App 下載量前三名。同時斬獲東南亞及臺灣市場購物類 App 年度總下載量、平均月活數、安卓使用總時長三項冠軍,並領跑東南亞兩大頭部市場,拿下印尼及越南年度購物類 App 下月活量雙冠王。其中包括訂單商品、物流,支付,數字產品等各方面的業務。為了支援這些網際網路化產品,應對越來的越多的業務挑戰,於是我們進行了資料倉庫的設計和架構建設。
資料倉庫挑戰
當前隨著業務發展,資料規模的膨脹和商務智慧團隊對實時需求的不斷增長,業務挑戰越來越大:
業務維度 而言,業務需求越來越複雜,有需要明細資料查詢,又有實時各種維度聚合報表,實時標籤培訓和查詢需求。同時大量業務共享了一些業務邏輯,造成大量業務耦合度高,重複開發。平臺架構 而言,當前任務越來越多,管理排程,資源管理,資料品質異常監控等也越來越重要。實時化也越來急迫,目前大量業務還是離線任務形式,導致凌晨服務負載壓力巨大,同時基於 T+1(天、小時級)架構業務無法滿足精細化、實時化運營需要。技術實現 而言,現在實時業務大量採用 Spark Structured Streaming 實現,嚴重依賴 HBase 做 Stateful 需求,開發複雜;在異常故障事故,Task 失敗,缺乏 Exactly Once 特性支援,資料易丟失、重複。為了解決上述問題,於是開始了 Flink 實時數倉的探索。
資料倉庫架構
為了支援這些網際網路化產品不斷增長的的資料和複雜的業務,Shopee 構建如下圖資料倉庫架構,從下到上層來看:
目前在 Shopee Data Team 主要從資料分庫 Binlog 以及 Tracking Service 同步到 Kafka 叢集中,通過 Flink/Spark 計算,包含實時訂單商品銷量優惠活動分析,訂單物流分析、產品使用者標新、使用者印象行為分析,電商活動遊戲運營分析等。最後的結果存到 Druid、 HBase、 HDFS 等,後面接入一些資料應用產品。目前已經有不少核心作業從 Spark Structured Streaming 遷移到 Flink Streaming 實現。
Flink 與 Druid 結合的實時數倉應用
在實時訂單銷量分析產品中,通過 Flink 處理訂單流,將處理後的明細資料實時注入Druid,達到公司實時運營活動分析等用途。我們使用 T-1(天)的 Lambda 架構來實時和歷史訂單資料產品分析,Flink 只處理實時今天的訂單資料,每日會定時將昨日的資料通過離線任務索引到 Druid 中覆蓋修正實時資料的微小誤差。整體的 Flink 實時處理流程如下圖,從上中下看共三條流水線:
第一條流水線,通過 Kafka 接入 訂單 Binlog 事件。
首先,解析反序列化訂單事件,通過訂單時間過濾無效訂單,只保留今日訂單。通過訂單主鍵 KeyBy 進入ProcessWindowFunction,因為上游資料是 Binlog 會有重複訂單事件,所以會通過 ValueState 來對訂單進行去重。然後,通過查詢 HBase (Phoenix 表)進行Enrichment 維度欄位,從 Phoenix 表中獲取訂單商品資訊,分類,使用者資訊等。最後,通過判斷是否所有欄位成功關聯,如果所有欄位都關聯成功將會把訊息打入下游kafka,並實時注入到 Druid;如果有欄位關聯失敗將會把訂單事件通過 Side Output 進入另一個 Slow Kafka Topic,以便處理異常訂單。第二條流水線比較複雜,通過多個實時任務將各分表 Slave Binlog 同步到 Hbase Phoenix 表,以便做成實時訂單流的維度表。目前遇到比較多問題還是經常 Binlog 延遲等問題,以及資料熱點問題。
第三條流基本與第一條類似,類似訊息佇列中的 dead message 異常處理。因為大量維度表依賴,不能保證 Phoenix 都在訂單被處理前就被同步到 Phoenix 表,比如新訂單商品,新使用者,新店鋪,新分類,新商品等。所以我們引入一條實時 backfill 處理流將會對第一條主流,處理失敗的訂單重複處理,直到所有欄位都關聯成功才會進入下游 Druid。
另外為了避免一些過期訊息進入死迴圈,同樣有個事件過濾視窗,保證只保留今日的訂單事件在流水線中被處理。不同的是,因為需要區分付款訂單和未付款訂單事件型別(可能一個訂單有兩個狀態事件,當用戶下單時,會有一個下單事件,當用戶完成支付會有一個支付完成事件),所以將訂單是否被處理狀態放在enrichment之後標記重複成功。
因為上游資料來源是 Binlog,所以隨著訂單狀態的更新,會有大量的訂單重複事件。通過使用 Flink State 功能儲存在記憶體中(FsSateBackend),以 ValueState 來標記訂單是否被處理,通過設定 TTL,保證訂單狀態儲存24小時過期,現在活動高峰期大概2G State,平均每個TaskManager大約100M State。Checkpoint interval 設定為10秒一次,HDFS 負載並不高。同時因為流使用了視窗和自定義 Trigger,導致 State 需要緩衝少量視窗資料。視窗的使用將會在 Enrihcment 流程優化部分詳細說明。
在 Enrichment 步驟, 業務邏輯複雜,存在大量 IO,我們做了大量改進優化。
首先 ,從 HBase 表關聯欄位,通過增加 Local RLU Memeory Cache 層,減少 Hbase 的訪問量,加速關聯;對 HBase Row Key Salt Bucket 避免訂單商品表訪問熱點問題。第二 ,HBase 表直接訪問層(Service)通過 Google Guice 管理依賴方便配置管理,記憶體 Cache 關聯等。第三 ,由於商品表和訂單商品同步到 HBase 有一定延遲,導致大量的訂單事件進入 Slow Kafka topic,所以通過設定視窗和自定義 Trigger 保證訂單數量到一定數量或者視窗超時才觸發視窗資料的處理,優化後能保證98%的訂單在主流被成功處理。最後 ,在訂單關聯訂單商品時,考慮過使用 Interval Join 來做,但是由於一個訂單有多條訂單商品資訊,加上上游是 Binlog 事件,以及其他維度表資料延遲問題,導致業務邏輯複雜,而且計算產出資料儲存在 Druid 只能支援增量更新。所以選擇了使用 HBase 儲存來關聯訂單商品資訊,附加慢訊息處理流來解決資料延遲問題。■ 資料品質保障和監控
目前將 Checkpoint 設定為 exactly once 模式,並開啟了Kafka exactly once 生產者模式,通過 Two Phase Commit 功能保證資料的一致性,避免 task 失敗,job 重啟時導致資料丟失。監控方面,通過監控 Upstream Kafka Topic,以及 HBase 表寫入更新狀態,結合下游 Druid 資料延遲監控,做到 end-to-end 的 lag 指標監控。通過 Flink Metric Report 彙報 Hbase 訪問效能指標,快取大小,延遲訂單數量等來對 Flink job 具體步驟效能分析。
Flink 與 Hive 結合的實時數倉應用
在訂單物流實時分析業務,接入 Binlog event 實現支援點更新的物流分析,使用 Flink Retract Stream 功能來支援每當訂單和物流有最新狀態變化事件就觸發下游資料更新。通過 Interval Join 訂單流和物流流,並使用 Rocksdb State 與 Incremental Checkpoint 來維護最近七天的狀態資料,從 Hbase 來增加使用者維度資訊等,維度欄位 enrihcment 通過 Local LRU Memory Cache 層來優化查詢,最後定時從 Hbase 匯出到 HDFS。
現在將 Flink 任務產生的訂單物流事件儲存 HBase 來支援記錄級別的點更新,每小時從 HBase 匯出到 HDFS 結果,通過 Presto 接入來做實時分析。HBase 匯出到HDFS,通過對 Hbase Row Key Salt Bucket 避免熱點問題,優化減小 Region Size(預設10G)來減少匯出時間。但是資料現在延遲還是比較嚴重,在一個半小時左右,而且鏈路繁瑣。將來考慮加入 Apache Hudi 組建接入 Presto,將延遲降到半小時內。
Streaming SQL 應用與管理目前 Shopee 有大量的實時需求通過 SQL 實現,應用場景主要是應用層實時彙總資料報表、維度表更新等。業務通過 SDK 和一站式網站管理兩種方式實現。一是以 SDK 形式提供支援,使用者可以通過引入 JAR 依賴進行二次專案開發。二是製作了相關網站,通過以任務形式,使用者建立任務編輯儲存 SQL 來實現業務需求,目前支援如下:
任務列表、分組管理,支援重啟,停止,禁用任務功能。任務支援 crontab 規則定時執行排程模式和 Streaming 模式。JAR 資源管理,任務自定義 JAR 引用,以便重複使用 UDF 等。通用 SQL 資源管理,任務引入共享 SQL 檔案,避免重複 SQL 邏輯、重複定義 View 以及環境配置等。使用者分組許可權管理。整合 Garafna 做任務延遲報警。下面是部分任務組織 UI 化形式:
當前平臺只支援 Spark SQL 實現 Stream SQL,使用 Hive 儲存元資料,通過關聯維度表 JOIN Apache Phoenix 等外部表和外部服務實現 enrichment 等功能。通過對比 Flink SQL 與 Spark SQL,發現 Spark SQL 不少缺點:
Spark SQL 視窗函式種類少,沒有 Flink 的支援靈活,導致大量聚合任務無法通過平臺 SQL 化。Spark Stateful 狀態控制差,沒有 Flink Rocksdb State 增量狀態支援。Spark 關聯維度表時,以前在每次 micro-batch 中都需要載入全量維度表,現在已經改為 GET 方式,Lookup 效能方面已經有提升不少,但還是沒有像 Flink 非同步 Lookup 那樣的非同步功能,提高效能。沒有 Flink Snapshot 和 Two Phase Commit 功能的支援,導致任務重啟,失敗恢復會出現資料不一致,失去準確性。Spark SQL 支援還是有很多侷限性,目前正在做 Flink SQL 需求匯入評估階段,並計劃在 Stream SQL Platform 接入 Flink SQL 的支援。來滿足公司越來越複雜使用者畫像標籤標註和簡單實時業務 SQL 化,減少業務開發成本。同時需要引入更好的 UDF 管理方式,整合元資料服務簡化開發。
Streaming Job 管理Shopee Data Team 擁有大量的實時任務是通過 Jar 包釋出的,目前在 Job 管理上通過網站頁面化,來減少 Job 維護成本。目前支援環境管理,任務管理,任務應用配置管理,和任務監控報警。
環境管理
目前可以配置 Flink / Spark Bin 路徑來支援不同的 Flink/Spark 版本,來支援 Flink 升級帶來的多版本問題,並支援一些顏色高亮來區分不同環境。
任務管理
現在支援實時任務的環境檢索,狀態檢索,名字檢索等。支援重啟,禁用,配置任務引數等。任務支援從 checkpoint/savepoint 恢復,停止任務自動儲存 savepoint,從 kafka timestamp 啟動。
任務配置管理
同時實時任務也支援配置記憶體,CPU 等 Flink Job 執行引數、JAR 依賴配置等。目前支援預覽,編輯更新等,通過 Jekins CICD 整合與人工干預結果,來完成 Job 的部署升級。
任務應用配置管理
任務應用配置是使用 HOCON 配置格式支援,目前支援共享配置整合,並通過配置名約定將 Checkpoint 路徑自動繫結到配置中。網站支援預覽模式,編輯模式,配置高亮等,將來會整合配置版本回滾等功能。
任務監控報警
對於任務監控方面,現在支援任務異常處理報警。異常處理支援自動掛起失敗的任務,並從上次最新 checkpoint 恢復;通過 Flink REST API 檢測 Flink Job 狀態,來避免 Flink Job 異常造成的假活狀態。出現任務重啟,異常情況會通過郵件等方式給任務負責人發報警,未來打算在網站整合 Grafana/Promethus 等監控工具來完成任務監控自動化等。
未來規劃總體而言,Flink 在 Shopee 從 2019 年底開始調研,到專案落地不到半年時間,已經完成業務大量需求匯入評估,對 Exactly Once,Kafka Exactly Once Semantics,Two Phase Commit,Interval Join,Rocksdb/FS State 一系列的功能進行了驗證。在未來規劃上:
首先,會嘗試更多的實時任務 Flink SQL 化,進一步實現流批統一; 其次,會對目前大量 Spark structured Streaming Job 遷移到 Flink 實現,並對新業務進行 Flink 探索。 在 Streaming SQL Platform 也會加入 Flink SQL 支援,來解決當前平臺遇到一些效能瓶頸和業務支援侷限性。作者簡介:
黃良輝,2019 年加入 Shopee,在 Shoppe Data Team 負責實時資料業務和資料產品開發。