背景介紹
好未來是一家 2003 年成立教育科技公司,旗下有品牌學而思,現在大家聽說的學而思培優、學而思網校都是該品牌的衍生,2010 年公司在美國納斯達克上市,2013 年更名為好未來。2016 年,公司的業務範圍已經覆蓋負一歲到 24 歲的使用者。目前公司主營業務單元有智慧教育、教育領域的開放平臺、K12 教育以及海外留學等業務。
好未來資料中臺全景圖
上圖為好未來資料中臺的全景圖,主要分為三層:
● 第一層是資料賦能層● 第二層是全域資料層● 第三層是資料開發層
首先,資料賦能層。主要是商業智慧、智慧決策的應用,包括一些資料工具、資料能力以及專題分析體系,資料工具主要包括埋點資料分析工具、AB 測試工具、大屏工具;資料能力分析主要包括未來畫像服務、未來增長服務、未來使用者服務以及新校區的選址服務;專題分析體系主要包企業經營類專題分析等等。
其次,資料全域層。我們期望將全集團所有的事業部的資料進行深入的拉通和融合,打通不同業務線、產品線的使用者池,從而盤活全集團的資料。具體的手段是 IDMapping,將裝置 id、自然人、家庭三個層級的 id 對映關係挖掘出來,將不同產品上的使用者資料關聯起來。這樣就能夠形成一個打的使用者池,方便我們更好的賦能使用者。
最後,資料開發層。資料開發透過一些列的平臺承載了全集團所有的資料開發工程,主要包括資料整合、資料開發、資料質量、資料服務、資料治理等服務。我們今天要分享的實時平臺就是在資料開發中。
好未來 T-Streaming 實時平臺實時平臺構建前的訴求
實時平臺在構建之初,我們梳理了四個重要的訴求。
● 第一個訴求是期望有一套統一的叢集,透過提供多租戶,資源隔離的方式提高資源利用率,解決多個事業部多套叢集的問題。● 第二個訴求是期望透過平臺的方式降低實時資料開發的門檻,從而能夠覆蓋更多的開發者。● 第三個訴求是期望能夠提供通用場景的解決解方案,提高專案的複用性,避免每個事業部都開發相同場景的分析工具。● 第四個訴求是對作業進行全方位的生命週期管理,包括元資料和血緣,一旦有一個作業出現異常,我們可以快速分析和定位影響範圍。
實時平臺功能概述現在我們平臺已經是一個一站式的實時資料分析平臺,包括了資料整合、資料開發、作業保障、資源管理、資料安全等功能。
● 在資料整合方面,我們支援資料庫、埋點資料、服務端日誌資料的整合,為了能夠提高資料整合的效率,我們提供了很多的通用模板作業,使用者只需要配置即可快速實現資料的整合。● 在資料開發方面,我們支援兩種方式的作業開發,一種是 Flink SQL 作業開發、一種是 Flink Jar 包託管,在 Flink SQL 開發上我們內建了很多 UDF 函式,比如可以透過 UDF 函式實現維表 join,也支援使用者自定義 UDF,並且實現了 UDF 的熱載入。除此之外,我們也會記錄使用者在作業開發過程中的元資料資訊,方便血緣系統的建設。● 在作業保障方面,我們支援作業狀態監控、異常告警、作業失敗之後的自動拉起,作業自動拉起我們會自動選擇可用的 checkpoint 版本進行拉起,同時也支援作業在多叢集之間的切換。● 在資源管理方面,我們支援平臺多租戶,每個租戶使用 namespace 進行隔離、實現了不同事業部、不同使用者、不同版本的 Flink 客戶端隔離、實現了計算資源的隔離。● 在資料安全方面,我們支援角色許可權管理、表級別許可權管理、操作審計日誌查詢等功能。
以上就是我們平臺的功能,在賦能業務的同時,我們也還在快速迭代中,期望平臺簡單好用,穩定可信賴。
實時平臺的批流融合接下來說一下平臺建設中的一些實踐,第一個是批流融合。
我們先理清楚批流融合是什麼?
批流融合可以分為兩個概念,一個是 Flink 提出的批流融合,具體的理解就是一個 Flink SQL 既可以作用於流資料、也可以作用於批資料,透過保證計算引擎一致從而減少結果資料的差異,這是一個技術層面上的批流融合。另個一概念是我們內部提出來的,那就是架構層面的批流融合。具體的操作手法就是透過 Flink 作業保證資料倉庫 ODS 層的實時化,然後提供小時級別、分鐘級別的排程,從而提高資料分析的實時化。
為什麼我們會提出架構上的批流融合,主要我們看到行業發展的兩個趨勢。
● 第一個趨勢是資料整合的實時化和元件化,比如 Flink 整合 Hive、Flink CDC 的持續完善和增強,這樣我們做資料整合的時候就會變得非常簡單。● 第二個趨勢是實時 OLAP 引擎越來越成熟,比如 Kudu+impala、阿里雲的 Hologres、湖倉一體的方案。
這兩個趨勢讓使用者開發實時資料會變得越來越簡單,使用者只需要關注 SQL 本身就可以。
如上圖所示,我們有三個型別的實時數倉,一個是基於 Hive 的、一個是基於實時 OLAP 引擎的、一個是基於 Kafka 的。其中,藍色線條就是我們 ODS 層實時化的具體實現。我們提供了一個統一的工具,可以將實時的將資料寫入到 Hive、實時 OLAP 引擎、當然還有 Kafka。這個工具使用起來比較簡單,如果是 MySQL 資料的同步,使用者只需要輸入資料庫名稱和表名就可以了。
透過 ODS 層實時化的工具,我們就可以在 Hive、實時 OLAP 引擎、Kafka 中構建實時數倉。
● 如果是 Hive 實時數倉,我們會使用 Flink 將實時的增量資料寫入到 ODS 層,然後提供一個定時 merge 的指令碼,用來 merge 增量資料和歷史資料,從而保證 ODS 層的資料是最新最全的。配合 airflow 小時級別的排程能力,使用者就可以得到一個小時級別的數倉了。● 如果是類似於 Kudu / Hologres 這樣的實時 OLAP 引擎,我們會先把離線資料從 Hive 中匯入到實時 OLAP 引擎中,然後使用 Flink 將實時的增量資料寫入到 ODS 層,寫入的方式推薦使用 upsert 這樣的特性,這樣使用者就能夠得到一個純實時的數倉了。配合 airflow 分鐘級別的排程能力,使用者就可以得到一個分鐘級別的數倉了。● 基於 Kafka 構建實時數倉,就是非常經典的架構了,開發成本也比較高一些,除了必須要秒級更新的分析場景,我們不太建議使用者使用。當然在 2021 年的時候,我們也會去做 Flink 批流一體解決方案,讓使用者有更多選擇方式的同時,讓整個實時數倉變得更加簡單。
以上就是我們對批流融合的思考和實踐,透過這種架構層面的批流融合,原來需要開發一個月的實時需求,現在 2 天就差不多能完成。大大降低了開發實時資料的門檻,提高了資料分析的效率。
實時平臺 ODS 層實時化說一下 ODS 層實時化我們具體是怎麼做的。
要想把 ODS 層資料實時化,我們需要解決兩個問題,第一個是離線資料的初始化問題,第二個是增量資料如何寫入的問題。離線資料匯入比較好做,如果資料來源是 MySQL,我們可以使用 DataX 或者 Spark 作業的方式將 MySQL 的全量資料匯入到 Hive 中,而實時增量資料的寫入我們需要有兩個步驟,第一個步驟是將 MySQL 的 binlog 採集到 Kafka,第二個步驟是將 Kafka 的資料使用Flink作業匯入到 Hive。這樣算下來,要解決 ODS 層實時化的問題,我們就需要一個離線初始化的作業,一個增量資料採集的作業,一個增量資料寫入的作業,也就是需要 3 個作業。
在我們的平臺上,我們對 ODS 層的 3 個作業進行了封裝和統一排程,使用者只需要輸入一個數據庫名稱和表的名稱就能完成 ODS 層實時化的工作。
以上就是我們批流融合中 ODS 層實時化的實現過程。
實時平臺 Flink SQL 開發流程我們另外一個實踐,就是對 Flink SQL 的作業封裝。先看一下,在我們平臺上進行 Flink SQL 開發的整體流程。
從左往右看,資料來源中的資料會透過 Maxwell、canal 這樣的工具採集到 Kafka,採集到 Kafka 的原始資料格式並不是統一的,所以我們需要將 Kafka 中的資料進行統一格式化處理,我們預設支援埋點資料格式、canal 資料格式、maxwell 資料的解析,也支援使用者自己上傳 Jar 包進行資料解析,解析得到的標準化資料就會再次傳送到 Kafka。
然後我們會使用 Flink SQL 作業來消費 Kafka 的資料,進行 SQL 指令碼的開發。這裡的 SQL 指令碼開發和原生的 Flink SQL 的指令碼開發有一點不一樣,原生的 SQL 指令碼開發使用者需要編寫 Source 資訊、Sink 資訊,在我們平臺上使用者只需要寫具體的 SQL 邏輯就可以了。
那使用者寫完 SQL 之後,會將 SQL 作業資訊提交到我們封裝好的 Flink SQL 執行作業上,最後透過我們封裝的 SQL 引擎將作業提交的 Flink 叢集上去執行。後面將介紹我們是怎麼封裝的。
以上就是在我們平臺上進行 Flink SQL 開發的流程,出了 Flink 作業本身的開發和提交,平臺也會保留與作業有關的各種輸入、輸出的 schema 資訊。比如業務資料庫表的 schema 資訊,經過同意加工之後的 schema 資訊,資料輸出的表的 schema 資訊,透過這些記錄,後期我們排查問題的時候就能夠快速梳理出作業的來龍去脈和影響範圍。
實時平臺 Flink SQL 開發過程在我們平臺上開發 Flink SQL 作業,只需要三個步驟:
● 第一個步驟確認 Kafka 的 Topic 是否已經註冊過了,如果沒有註冊就需要使用者手動註冊下,完成註冊後,我們會把 Topic 的資料解析出來,將欄位資訊儲存起來。● 第二步使使用者編寫 SQL,剛才說過,使用者只需要寫具體的 SQL 邏輯,不需要寫 Source 和 Sink 資訊。● 第三步是使用者指定將資料輸出到哪裡,現在平臺可以支援同時指定多個 Sink 儲存裝置,比如將計算好的資料同時寫入到 Hive、Holo 等儲存。
透過以上三個步驟的配置,使用者就可以提交作業了。
接下來說一下,我們是怎麼做的,我把整個執行過程分為 2 個階段 10 個步驟。第一個階段就是作業準備階段,第二個階段就是 SQL 執行階段。
● 第一步,使用者在頁面資料 SQL 和指定 Sink 資訊。● 第二步,SQL 解析及校驗過程,當用戶提交 SQL 時,我們會對 SQL 進行解析,看看 SQL 中用到的 Source 表和 UDF 是否在平臺中註冊過。● 第三步,推測建表,我們會先運用下使用者的 SQL,然後得到 SQL 的返回結果,根據結果資料生成一些建表語句,最後透過程式自動到目標 Sink 儲存上去建表。● 第四步,拼裝 Flink SQL 的指令碼檔案,得到一個有 Source、SQL、Sink 三要素的指令碼檔案。● 第五步,作業提交,這裡會把 Flink SQL 檔案提交到我們自己執行引擎中。
● 第一步是會初始化 StreamTableAPI,然後使用 connect 方法註冊 Kafka Source,Kafka 的 Source 資訊需要指定資料解析的規則和欄位的 schema 資訊,我們會根據元資料自動生成。● 第二步是使用 StreamTableAPI 註冊 SQL 中使用到的維表和 UDF 函式,UDF 函式包括使用者自己上傳的 UDF 函式。● 第三步是使用 StreamTable API 執行 SQL 語句,如果有檢視也可以執行檢視。● 第四步是一個比較關鍵的步驟,我們會把 StreamTabAPI 轉成 DataStream API。● 第五步就是在 DataStream 的基礎上 addSink 資訊了。
以上是兩個階段的執行過程,透過第二個階段,使用者的 SQL 作業就會真正的執行起來。
實時平臺原生作業與模板任務上面分享了我們的 Flink SQL 作業如何開發和執行,接下來說一下我們平臺對 JAR 包型別作業的支援。
在我們平臺上,我們支援使用者自己上傳 JAR 包作業,然後在我們平臺上進行管理。與此同時,為了提高程式碼通常場景的複用性,我們開發了很多模板作業,比如支援 Maxwell 採集的 binlog 直接寫入到 Hive、Kudu、Holo 等儲存裝置,支援阿里雲 SLS 日誌寫入到各種 OLAP 引擎。
實時平臺混合雲部署方案講一下混合雲部署方案和平臺技術架構。
我們平臺現在支援將作業提交到阿里雲機房、自建機房中,並且作業可以在兩個機房中來回切換。為了要有這個功能呢?
今年年初,隨著疫情的爆發,網際網路線上教育湧入了大量的流量,為了應對暴增的流量,春節期間我們採購了上千臺機器進行緊急的部署和上線,後來疫情穩定住了之後,這些機器的利用率就比較低了,為了解決這個問題,我們平臺就支援了混合雲部署方案,高峰期的時候作業可以遷移到阿里雲上執行,平常就在自己的叢集上執行,既節約了資源又保證了彈性擴容。
實時平臺技術架構接下來說一下平臺的技術架構。
我們是一個前後端分離的專案,前端使用 vue+elmentui、服務端使用 springboot,不同的機房裡面我們會部署一個後端服務的例項。任務提交到不同的機房主要透過轉發層的 nginx+lua 來實現的。平臺上任務的提交、暫停、下線操作,都是透過驅動層來完成的,驅動層主要是一些 shell 指令碼。最後就是客戶端了,在客戶端上我們做了 Namespace/使用者/Flink 版本的隔離。
K12 教育典型分析場景--續報業務介紹我們聊一個具體的案例,案例是 K12 教育行業中典型的分析場景,使用者續報業務。先說下什麼是續報,續報就是重複購買,使用者購買了一年的課程,我們期望使用者購買二年的課程。為了使用者購買課程,我們會有一個集中的時間段用來做續報,每次持續一週左右,一年四次。
因為續報週期比較集中,時間比較短暫,每次做續報業務老師對實時續報資料的需求就特別迫切。
為此我們做了一個通用的續報解決方案,來支援各事業部的續報動作。要做實時續報,有幾個挑戰。
● 第一個挑戰是計算一個使用者的訂單是否是續報,需要依賴這個使用者歷史上所有的訂單,也就是需要歷史資料參與計算。● 第二個挑戰就是一個訂單的變化會影響其它訂單的變化,是一個連鎖效應。比如使用者有 5 個訂單,編號為 345 的訂單都是續報狀態,如果使用者取消了編號為 3 的訂單,訂單 4 和訂單5的續報狀態就需要重新計算。● 第三個挑戰是維度變化很頻繁,比如使用者上午的分校狀態是北京,下午的分校狀態可能就是上海,上午的輔導老師是張三,下午的輔導老師就是李四,頻繁變化的維度給實時彙總資料帶來了挑戰。
依賴歷史資料、訂單改變的連鎖效應、頻繁變化的維度,這些挑戰如果單個看都不算什麼,如果放在一起就會變得比較有意思了。
實時續報解決方案先說下整體架構,我們採用的批流融合方式來做的,分成兩條線,一條線是分鐘級實時續報資料計算,一條是秒級實時續報資料計算。計算好的資料放在 MYSQL 中,用來做大屏和 BI 看板。
先看下藍色的這條線,我們會把 Hive 中的離線資料匯入到 Kudu 中,離線資料都是計算好的訂單寬表。然後會使用 Flink 作業把新增的訂單做成寬表寫入到 Kudu 中,這樣 Kudu 裡面就會有最新最全的資料。配合 4 分鐘的排程,我們就提供了分鐘級的實時續報資料。
在看第一條橙色的線條,這條線上有兩個 Flink 作業,一個是 ETL Job,一個是 Update Job。
ETL job 會負責靜態維度的拼接與續報狀態的計算,靜態維度拼接我們是直接訪問 MySQL,然後快取在 JVM 中。續報狀態的計算需要依賴歷史資料,ETL Job 會將所有的訂單資料載入到 JVM 中,具體的實現方法是我們自定義了一個 partitioncustom 方法,對所有的歷史資料進行了分片,下游的每個 Task 快取一個分片的資料。透過將資料載入到記憶體中,我們大大的加快了 Flink 實時計算的速度。
ETL Job 的計算的資料,會有兩個輸出,一個是輸出到 Kudu,用來保證 Kudu 中的資料最新最全,兩個一個數據是 Kafka,Kafka 中有一個 Topic 記錄的是是當前訂單的變化導致了哪些訂單或者維度變化的資訊。
接在 Kafka 後面的程式就是 Update Job,專門用來處理受影響的訂單或者維度,直接去修改 MySQL 中相關的統計資料。
這樣我們就透過 2 個 Flink 作業實現的實時續報的計算。
最下面的一條線是實時維度的資料變更的處理,維度變更的資料會發送到 Kafka中,然後使用 Flink 進行處理,看看維度的變化影響了哪些資料的統計,最後將受影響的訂單傳送到受影響的 Topic 中,由 Update Job 來重新計算。
以上就是我們實時續報的整體解決方案,如果有教育行業的朋友聽到這個分享,或許可以參考下。
實時續報穩定性保障我們看看這個通用的解決方案上線之後有哪些保障。
● 第一個保障是異地雙活,我們在阿里雲和自建機房都部署了一套續報程式,如果其中一套有異常,我們切換前端介面就可以了。如果兩個機房的程式都掛了,我們重零開始啟動程式,也只需要 10 分鐘。● 第二個保障是作業容錯,我們有兩個 Flink 作業,這兩個作業隨停隨啟,不影響資料的準確性。另外一點就是我們快取了所有訂單資料在 JVM 中,如果資料量暴漲,我們只需要改變 ETL 程式的並行度就可以,不用擔心 JVM 記憶體溢位。● 第三個保障是作業監控,我們支援作業的異常告警和失敗後的自動拉起,也支援消費資料延遲告警。
透過以上保障措施,實時續報程式經過了幾次續報週期,都比較平穩,讓人很省心。
展望與規劃上述內容詳細介紹了好未來當前業務及技術方案,總結而言我們透過多租戶實現各事業部資源隔離、透過批流融合的架構方案解決分析實時化、透過 ODS 層實時化解決資料來源到 OLAP 的資料整合問題、透過 Flink SQL 封裝降低實時資料開發門檻、透過模板任務提供通用場景解決方案、透過混合雲部署方案解決資源的彈性擴容、透過實時續報解決方案覆蓋相同場景的資料分析。
最後,來看一下我們展望和規劃。接下來我們要繼續深化批流融合,強化混合雲部署,提高資料分析的時效性和穩定性。支援演算法平臺的實時化,資料應用的實時化,提高資料決策的時效性。