一、業務背景
網易遊戲 ETL 服務概況
網易遊戲的基礎資料主要日誌方式採集,這些日誌通常是非結構化或半結構化資料,需要經過資料整合 ETL 才可以入庫至實時或離線的資料倉庫。此後,業務使用者才可以方便地用 SQL 完成大部分資料計算,包括實時的 Flink SQL 和離線的 Hive 或 Spark。
網易遊戲資料整合的資料流與大多數公司大同小異,主要有遊戲客戶端日誌、遊戲服務端日誌和其他周邊基礎的日誌,比如 Nginx access log、資料庫日誌等等。這些日誌會被採集到統一的 Kafka 資料管道,然後經由 ETL 入庫服務寫入到 Hive 離線資料倉庫或者 Kafka 實時資料倉庫。
這是很常見的架構,但在我們在需求方面是有一些比較特殊的情況。
網易遊戲流式 ETL 需求特點
首先,不同於網際網路、金融等行業基本常用 MySQL、Postgres 等的關係型資料庫,遊戲行業常常使用 MongoDB 這類 schema-free 的文件型資料庫。這給我們 ETL 服務帶來的問題是並沒有一個線上業務的準確的 schema 可以依賴,在實際資料處理中,多欄位或少欄位,甚至一個欄位因為玩法迭代變更為完全不同的格式,這樣的情況都是可能發生的。這樣的資料異構問題給我們 ETL 的資料清洗帶來了比較高的成本。
其次,也是由於資料庫選型的原因,大部分業務的資料庫模式都遵循了反正規化設計,會刻意以複雜內嵌的欄位來避免表間的 join。這種情況給我們帶來的一個好處是,在資料整合階段我們不需要去實時地去 join 多個數據流,壞處則是資料結構可能會非常複雜,多層巢狀十分常見。
然後,由於近年來實時數倉的流行,我們也同樣在逐步建設實時資料倉庫,所以複用現有的 ETL 管道,提取轉換一次,載入到實時離線兩個資料倉庫,成為一個很自然的發展方向。
最後,我們的日誌型別多且變更頻繁,比如一個玩法複雜的遊戲,可能有 1,000 個以上的日誌型別,每兩週可能就會有一次發版。在這樣的背景下 ETL 出現異常資料是不可避免的。因此我們需要提供完善的異常處理,讓業務可以及時得知資料異常和透過流程修復資料。
日誌分類及特點
為了更好地針對不同業務使用模式最佳化,我們對不同日誌型別的業務提供了不同的服務。我們的日誌通常分為三個型別:運營日誌、業務日誌和程式日誌。
運營日誌記錄的是玩家行為事件,比如登入帳號、領取禮包等。這類日誌是最為重要日誌,有固定的格式,也就是特定 header + json 的文字格式。資料的主要用途是做資料報表、資料分析還有遊戲內的推薦,比如玩家的組隊匹配推薦。
業務日誌記錄的是玩家行為以外的業務事件,這個就比較廣泛,比如 Nginx access log、CDN 下載日誌等等,這些完全沒有固定格式,可能是二進位制也可能是文字。主要用途類似於運營日誌,但更加豐富和定製化。
程式日誌記錄是程式的執行情況,也就是平時我們透過日誌框架打的 INFO、ERROR 這類日誌。程式日誌主要用途是檢索定位執行問題,通常是寫入 ES,但有時數量過大或者需要提取指標分析時,也會寫入資料倉庫。
網易遊戲 ETL 服務剖析
針對這些日誌分類,我們具體提供了三類 ETL 入庫的服務。首先是運營日誌專用的 ETL,這會根據運營日誌的模式進行定製化。然後是通用的面向文字日誌的 EntryX ETL 服務,它會服務於運營日誌以外的所有日誌。最後是 EntryX 無法支援的特殊 ETL 需求,比如有加密或者需要進行特殊轉換的資料,這種情況下我們就會針對性地開發 ad-hoc 作業來處理。
二、 運營日誌專用 ETL
運營日誌 ETL 發展歷程
運營日誌 ETL 服務有著一個比較久的歷史。大概在 2013 年,網易遊戲就建立了基於 Hadoop Streaming + Python 預處理/後處理的第一版離線 ETL 框架。這套框架是平穩運行了多年。
在 2017 年的時候,隨著 Spark Streaming 的嶄露頭角,我們開發了基於 Spark Streaming 的第二個版本,相當於一個 POC,但因為微批調優困難且小檔案多等問題沒有上線應用。
時間來到 2018 年,當時 Flink 已經比較成熟,我們也決定將業務遷移到 Flink 上,所以我們很自然地開發了基於 Flink DataStream 的第三版運營日誌 ETL 服務。這裡面比較特殊的一點就是,因為長久以來我們業務方積累了很多 Python 的 ETL 指令碼,然後新版最重要的一點就是要支援這些 Python UDF 的無縫遷移。
運營日誌 ETL 架構
接下來看下兩個版本的架構對比。
在早期 Hadoop Streaming 的版本里面,資料首先會被 dump 到 HDFS 上,然後 Hadoop Streaming 啟動 Mapper 來讀取資料並透過標準輸入的方式傳遞給 Python 指令碼。Python 腳本里面會分為三個模組:首先預處理 UDF,這裡通常會進行基於字串的替換,一般用作規範化資料,比如有些海外合作廠商的時間格式可能跟我們不同,那麼就可以在這裡進行統一。預處理完的資料會進入通用的解析/轉換模組,這裡我們會根據運營日誌的格式來解析資料,並進行通用轉換,比如濾掉測試服資料。通用模組之後,最後還有一個後處理模組進行針對欄位的轉換,比如常見的匯率轉換。之後資料會透過標準輸出返回給 Mapper,然後 Mapper 再將資料批次寫到 Hive 目錄中。
我們用 Flink 重構後,資料來源就由 HDFS 改為直接對接 Kafka,而 IO 模組則用 Flink 的 Source/Sink Operator 來代替原本的 Mapper,然後中間通用模組可以直接重寫為 Java,剩餘的預處理和後處理則是我們需要支援 Python UDF 的地方。
Python UDF 實現
在具體實現上,我們在 Flink ProcessFunction 之上加入了 Runner 層,Runner 層負責跨語言的執行。技術選型上是選了 Jython,而沒有選擇 Py4j,主要因為 Jython 可以直接在 JVM 裡面去完成計算,不需要額外啟動 Python 程序,這樣開發和運維管理成本都比較低。而 Jython 帶來的限制,比如不支援 pandas 等基於 c 的庫,這些對於我們的 Python UDF 來說都是可接受的。
整個呼叫鏈是,ProcessFunction 在 TaskManager 被呼叫時會在 open 函式延遲初始化 Runner,這是因為 Jython 是不可序列化的。Runner 初始化時會負責資源準備,包括將依賴的模組加入 PYTHONPATH,然後根據配置反射呼叫 UDF 函式。
呼叫時,對於預處理 UDF Runner 會把字串轉化為 Jython 的 PyUnicode 型別,而對於後處理 UDF 則會把解析後的 Map 物件轉為 Jython 的 PyDcitionary,分別作為兩者的輸入。UDF 可以呼叫其他模組進行計算,最終返回 PyObject,然後 Runner 再將其轉換成 Java String 或者 Map,返回給 ProcessFunction 輸出。
運營日誌 ETL 執行時
剛剛是 UDF 模組的區域性檢視,我們再來看下整體的 ETL 作業檢視。首先在我們提供了通用的 Flink jar,當我們生成並提交 ETL 作業到作業平臺時,排程器會執行通用的 main 函式構建 Flink JobGraph。這時會從我們的配置中心,也就是 ConfigServer,拉取 ETL 配置。ETL 配置中包含使用到的 Python 模組,後端服務會掃描其中引用到的其他模組,把它們統一作為資原始檔透過 YARN 分發功能上傳到 HDFS 上。在 Flink JobManager 和 TaskManager 啟動時,這些 Python 資源會被 YARN 自動同步到工作目錄上備用。這就是整個作業初始化的過程。
然後因為 ETL 規則的小變更是很頻繁的,比如新增一個欄位或者變更一下過濾條件,如果我們每次變更都需要重啟作業,那麼作業重啟帶來的不可用時間會對我們的下游使用者造成比較糟糕的體驗。因此,我們對變更進行了分類,對於一些不影響 Flink JobGraph 的輕量級變更支援熱更新。實現的方式是每個 TaskManager 啟動一個熱更新執行緒,定時輪詢配置中心同步配置。
三. EntryX 通用 ETL
接下來介紹我們的通用 ETL 服務 EntryX。這裡的通用可以分為兩層意義,首先是資料格式上的通用,支援非結構化到結構化的各種文字資料,其次是使用者群體的通用,目標使用者覆蓋資料分析、資料開發等傳統使用者,和業務程式、策劃這些資料背景較弱的使用者。
EntryX 基本概念
先介紹 EntryX 的三個基本概念,Source、StreamingTable 和 Sink。使用者需要分別配置這個三個模組,系統會根據這些自動生成 ETL 作業。
Source 是 ETL 作業的輸入源,通常是從業務端採集而來的原始日誌 topic,或者是經過分發過濾後的 topic。這些 topic 可能只包含一種日誌,但更多情況下會包含多種異構日誌。
接下來 StreamingTable,一個比較通俗的名稱就是流表。流表定義了 ETL 管道的主要元資料,包括如何轉換資料,還有根據轉換好的資料定義的流表 schema,將資料 schema 化。流表 schema 是最為關鍵的概念,它相當於 Table DDL,主要包括欄位名、欄位資料型別、欄位約束和表屬性等。為了更方便對接上下游,流表 schema 使用的是自研的 SQL-Like 的型別系統,裡面會支援我們一些拓展的資料型別,比如 JSON 型別。
最後 Sink 負責流表到目標儲存的物理表的對映,比如對映到目標 Hive 表。這裡主要需要 schema 的對映關係,比如流表哪個欄位對映到目標表哪個欄位,流表哪個欄位用作目標 Hive 表分割槽欄位。在底層,系統會自動根據 schema 對映關係來提取欄位,並將資料轉換為目標表的儲存格式,載入到目標表。
EntryX ETL 管道
再來看下 EntryX ETL 管道的具體實現。藍色部分是外部儲存系統,而綠色部分則是 EnrtyX 的內部模組。
資料首先從對接採集的原始資料 Topic 流入,經過 Source 攝入到 Filter。Filter 負責根據關鍵詞過濾資料,通常來說我們要求過濾完的資料是有相同 schema 的。經過這兩步資料完成 Extract,來到 Transform 階段。
Transform 第一步是解析資料,也就是這裡的 Parser。Parser 支援 JSON/Regex/Csv 三種解析,基本可以覆蓋所有案例。第二步是對資料進行轉換,這是由 Extender 負責的。Extender 透過內建函式或 UDF 計算衍生欄位,最常見的是將 JSON 物件拉平展開,提取出內嵌欄位。最後是 Formatter,Formatter 會根據之前使用者定義的欄位邏輯型別,將欄位的值轉為對應的物理型別。比如一個邏輯型別為 BIGINT 的欄位,我們在這裡會統一轉為 Java long 的物理型別。
資料完成 Transform 之後來到最後的 Load 階段。Load 第一步是決定資料應該載入到哪個表。Splitter 模組會根據每個表的入庫條件(也就是一個表示式)來分流資料,然後再到第二步的 Loader 來負責將資料寫到具體的外部儲存系統。目前我們支援 Hive/Kafka 兩種儲存,Hive 支援 Text/Parquet/JSON 三種格式,而 Kafka 支援 JSON 和 Avro 兩種格式。
實時離線統一 Schema
在 Entryx 的設計裡資料可以被寫入實時和離線兩個資料倉庫,也就是說同一份資料,但在不同的儲存系統中以不同格式表示。從 Flink SQL 的角度來說是 schema 部分相同,但 connector 和 format 不同的兩個表。而 schema 部分經常會隨業務變更,而 connector 和 format(也就是儲存系統和儲存格式)是相對穩定的。那麼一個很自然的想法就是,能不能將 schema 部分提取出來獨立維護?實際上,這個抽象的 schema 已經存在了,就是我們在 ETL 提取的流表 schema。
在 EntryX 裡面,流表 schema 是與序列化器、儲存系統無關的 schema,作為 Single Source of Truth。基於流表 schema,加上儲存系統資訊和儲存格式資訊,我們就可以衍生出具體的物理表的 DDL。目前我們主要是支援 Hive/Kafka,如果之後要拓展至支援 ES/HBase 表也是非常方便。
實時資料倉庫整合
EntryX 一個重要的定位是作為實時倉庫的統一入口。剛剛其實已經多次提到 Kafka 表,但還沒有說實時數倉是怎麼做的。實時數倉的常見問題是 Kafka 並沒有原生支援 schema 元資料的持久化。目前社群的主流解決方案是基於 Hive MetaStore 來儲存 Kafka 表的元資料,並複用 HiveCatalog 來直接對接到 Flink SQL。
但這對於我們來說使用 Hive MetaStore 主要有幾個問題:一是在實時作業裡引入 Hive 依賴並與 Hive 耦合,這是很重的依賴,導致定義的表很難被其他元件複用,包括 Flink DataStream 使用者;二是我們已經有 Kafka SaaS 平臺 Avatar 來管理物理 schema,比如 Avro schema,如果再引入 Hive MetaStore 會導致元資料的割裂。因此,我們是拓展了 Avatar 平臺的 schema 註冊中心,同時支援邏輯 schema 和物理 schema。
那麼實時數倉和 EntryX 的整合關係是:首先我們有 EntryX 的流表 schema,在新建 Sink 的時候呼叫 Avatar 的 schema 介面,根據對映關係生成邏輯 schema,而 Avatar 再根據 Flink SQL 型別與物理型別的對映關係生成 topic 的物理 schema。
與 Avatar schema 註冊中心配套的還有我們自研的 KafkaCatalog,它負責讀取 topic 的邏輯和物理 schema 來生成 Flink SQL 的 TableSource 或 TableSink。而對於一些 Flink SQL 以外的使用者,比如 Flink DataStream API 的使用者,他們也可以直接讀取物理 schema 來享受到資料倉庫的便利。
EntryX 執行時
和運營日誌 ETL 類似,在 EntryX 執行時,系統會基於通用的 jar 和配置生成 Flink 作業,但這裡有兩種情況需要特別處理。
首先是一個 Kafka topic 往往有幾十甚至上千種日誌,那麼對應其實有也幾十甚至上千的流表,如果每個流表都單獨執行在一個作業裡,那麼一個 topic 會可能會被讀上千遍,這是非常大的浪費。因此,在作業執行時提供一個最佳化策略,可以將同個 source 的不同流表合併到一個作業裡跑。比如圖中,某個手游上傳了 3 種日誌到 Kafka,使用者分別配置了玩家註冊、玩家登入、領取禮包三個流表,那麼我們可以這三個流表合併起來到一個作業,共享同一個 Kafka Source。
另外的一個最佳化是,一般情況下我們可以按照之前“提取轉換一次,載入一次”的思路來將資料同時寫到 Hive 和 Kafka,但是由於 Hive 或者說 HDFS 畢竟是離線系統,實時性比較差,寫入在一些負載比較高的 HDFS 老叢集經常會出現反壓,同時阻塞上游,導致 Kafka 的寫入也受到影響。在這種情況下,我們通常要分離載入到實時和離線的 ETL 管道,具體會取決於業務的 SLA 還有 HDFS 的效能。
四、調優實踐
接下來給大家分享下我們在 ETL 建設中的調優實踐經驗。
HDFS 寫入調優
首先是 HDFS 寫入的調優。流式寫入 HDFS 場景中老生常談的一個問題便是小檔案過多。通常來說小檔案和實時性是魚與熊掌不可兼得。如果要延遲低,那麼我們需要頻繁地滾動檔案來提交資料,必然導致小檔案過多。
小檔案過多主要造成兩個問題:一從 HDFS 叢集管理角度看,小檔案會佔用大量的檔案數和 block 數,浪費 NameNode 記憶體;二是從使用者角度看,讀寫效率都會降低,因為寫的時候要更頻繁地呼叫 RPC 和 flush 資料,造成更多的阻塞,有時甚至造成 checkpoint 超時,而讀時則需要開啟更多的檔案才能讀完資料。
HDFS 寫入調優 - 資料流預分割槽
我們在最佳化小檔案問題時做的一點調優是對資料流先做一遍預分割槽,具體來說,便是在 Flink 作業內部先基於目標 Hive 表進行一次 keyby 分割槽,讓同一個表的資料儘量集中在少數的幾個 subtask 上。
舉個例子,假設 Flink 作業並行度為 n,而目標 Hive 分割槽數為 m 個。因為每個 subtask 都有可能讀到任意分割槽的資料,在預設的各 subtask 完全並行的情況下,每個 subtask 都會寫所有分割槽,造成總體的寫入檔案數是 n * m。假設 n 是 100,m 是 1000,按 10 分鐘滾一次檔案算,每天會造成 14,400,000 個檔案,這對於很多老叢集來說是非常大的壓力。
如果經過資料流分割槽的最佳化之後,我們就可以限制住 Flink 並行度帶來的增長。比如我們 keyby hive 表字段,並加入範圍為 0-s 整數的鹽來避免資料傾斜,那麼分割槽最多會被 s 個 subtask 讀寫。假設 s 是 5,比起原先 n 是 100,那麼我們就將原本的檔案數降低為原來 20 分之一。
基於 OperatorState 的 SLA 統計
第二個我想分享的是我們的 SLA 統計工具。背景是我們的使用者經常會透過 Web UI 來進行除錯和問題的排查,比如不同 subtask 的輸入輸出數目,但這些 metric 會因為作業重啟或者 failover 而重置,因此我們開發了基於 OperatorState 的 SLA-Utils 工具來統計資料的輸入和分類輸出。這個工具設計得非常輕量級,可以很容易整合到我們自己的服務或者使用者的作業裡面。
在 SLA-Utils 裡面,我們支援了三種 metric。首先是標準的 metric,有 recordsIn/recordsOut/recordsDropped/recordsErrored,分別對應輸入記錄數/正常輸出記錄數/被過濾掉的記錄數/處理異常的記錄數。通常來說 recordsIn 就等於後面三者的總和。第二種使用者可以自定義的 metric,通常可以用於記錄更詳細的原因,比如是 recordsEventTimeDropped 代表資料是因為 event time 被過濾的。
那麼上述兩種 metric 靜態的,也就是說 metric key 在作業執行前就要確定,此外 SLA-Utils 還支援在執行時動態註冊的 TTL metric。這種 metric 通常有動態生成的日期作為字首,在經過 TTL 的時間之後被自動清理。TTL metric 主要可以用於做天級別時間視窗的統計。這裡比較特別的一點是,因為 OperatorState 是不支援 TTL 的,SLA-Utils 是在每次進行 checkpoint 快照的時候進行一次過濾,剔除掉過期的 metric,以實現 TTL 的效果。
那麼在 State 儲存了 SLA 指標之後要做的就是暴露給使用者。我們目前的做法是透過 Accumulater 的方式來暴露,優點是 Web UI 有支援,開箱即用,同時 Flink 可以自動合併不同的 subtask 的 metric。缺點在於沒有辦法利用 metric reporter 來 push 到監控系統,同時因為 Acuumulater 是不能在執行時動態登出的,所以使用 TTL metric 會有記憶體洩漏的風險。因此,在未來我們也考慮支援 metric group 來避免這些問題。
資料容錯及恢復
最後再分享下我們在資料容錯和恢復上的實踐。
與很多最佳實踐相似,我們用 SideOutput 來收集 ETL 各環節中出錯的資料,彙總到一個統一的錯誤流。錯誤記錄中包含我們預設的錯誤碼、原始輸入資料以及錯誤類和錯誤資訊。一般情況下,錯誤資料會被分類寫入 HDFS,使用者透過監控 HDFS 目錄可以得知資料是否正常。
那麼儲存好異常資料後,下一步就是要恢復資料。這通常有兩種情況。一是資料格式異常,比如日誌被截斷導致不完整或者時間戳不符合約定格式,這種情況下我們一般透過離線批作業來修復資料,重新回填到原有的資料管道。二是 ETL 管道異常,比如資料實際的 schema 有變更但流表配置沒有更新,可能會導致某個欄位都是空值,這時我們的處理辦法是:首先更新線上的流表配置為最新,保證不再產生更多異常資料,這時 hive 裡面仍有部分分割槽是異常的。然後,我們釋出一個獨立的補數作業來專門修復異常的資料,輸出的資料會寫到一個臨時的目錄,並在 hive metastore 上切換 partition 分割槽的 location 來替換掉原來的異常目錄。因此這樣的一個補數流程對離線查詢的使用者來說是透明的。最後我們再在合適的時間替換掉異常分割槽的資料並恢復 location。
五、未來規劃
最後介紹下我們的未來規劃。
第一個是資料湖的支援。目前我們的日誌絕大多數都是 append 型別,不過隨著 CDC 和 Flink SQL 業務的完善,我們可能會有更多的 update、delete 的需求,因此資料湖是一個很好的選擇。
第二個會提供更加豐富的附加功能,比如實時的資料去重和小檔案的自動合併。這兩個都是對業務方非常實用的功能。
最後是一個支援 PyFlink。目前我們的 Python 支援只覆蓋到資料整合階段,後續資料倉庫的 Python 支援我們是希望透過 PyFlink 來實現。
作者丨Paul Lin
來源丨http://www.whitewood.me/2020/12/20/%E7%BD%91%E6%98%93%E6%B8%B8%E6%88%8F%E5%9F%BA%E4%BA%8E-Flink-%E7%9A%84%E6%B5%81%E5%BC%8F-ETL-%E5%BB%BA%E8%AE%BE/