整理: 戴季國(Flink 社群志願者)
校對:苗文婷(Flink 社群志願者)
出處:https://mp.weixin.qq.com/s/9Y6XCZRuZ79ICJ5ylTYRDQ
Flink SQL 現狀視窗功能的擴充套件回撤流的最佳化未來的規劃Tips: 點選文末「 閱讀原文 」即可回顧作者原版分享影片~
一、 背景及現狀
1. 三種模式的分析
Flink 作業目前有三種建立方式:JAR 模式、畫布模式和 SQL 模式。不同的提交作業的方式針對的人群也是不 一樣的。
優點:功能靈活多變,因為它底層的 DataStream/DataSet API 是 Flink 的原生 API,你可以用它們開發任何你想要的運算元功能或者 DAG 圖;效能最佳化方便,可以非常有針對性的去最佳化每一個運算元的效能。缺點:依賴更新繁瑣,無論擴充套件作業邏輯或是 Flink 版本的升級,都要去更新作業的程式碼以及依賴版本;學習門檻較高。■ 畫布模式所謂的畫布模式,一般來講會提供一個視覺化的拖拉拽介面,讓使用者透過介面化的方式去進行拖拉拽操作,以完成 Flink 作業的編輯。它面向一些小白使用者。
優點:操作便捷,畫布上可以很方便地定義 Flink 的作業所包含的各種運算元;功能較全,它基於 Table API 開發,功能覆蓋比較完整;易於理解,DAG 圖比較直觀,使用者能夠非常容易的去理解整個作業的執行流程。缺點:配置複雜:每一個運算元都需要去逐個的去配置,如果整個 DAG 圖非常複雜,相應的配置工作也會非常大;邏輯重用困難:如果作業非常的多,不同的作業之間想去共享 DAG 邏輯的話非常困難。■ SQL 模式
SQL 語言已經存在了很長時間了,它有自己的一套標準,主要面向資料分析人員。只要遵循既有的 SQL 標準,資料分析人員就可以在不同的平臺和計算引擎之間進行切換。
優點:清晰簡潔,易於理解和閱讀;與計算引擎解耦,SQL 與計算引擎及其版本是解耦的,在不同的計算引擎之間遷移業務邏輯不需要或極少需要去更改整段 SQL。同時,如果想升級 Flink 版本,也是不需要去更改 SQL;邏輯重用方便,可以透過 create view 的方式去重用我們的 SQL 邏輯。缺點:語法不統一,比如說流與維表 Join,Flink 1.9 之前使用 Lateral Table Join 語法,但是在 1.9 之後,更改成了 PERIOD FOR SYSTEM_TIME 語法,這種語法遵循了 SQL ANSI 2011 標準。語法的變動使得使用者有一定的學習成本;功能覆蓋不全:Flink SQL 這個模組存在的時間不是很長,導致它的功能的一個覆蓋不是很全。效能調優困難:一段 SQL 的執行效率主要由幾個部分來決定,一個就是 SQL 本身所表達的業務邏輯;另一部分是翻譯 SQL 所產生的執行計劃的一個最佳化;第三部分的話,在產生最優的邏輯執行計劃之後,翻譯成本地的 native code 的時候方案也決定了 SQL 的執行效率;對於使用者來講的,他們所能最佳化的內容可能只侷限於 SQL 所表達的業務邏輯。問題定位困難:SQL 是一個完整的執行流程,如果我們發現某些資料不對,想針對性地去排查到底是哪個運算元出了問題,是比較的困難的。一般來講,我們想定位 Flink SQL 的問題,只能先不斷的精簡我們的整個 SQL 邏輯,然後不斷地去嘗試輸出,這個成本是非常高的。騰訊實時計算平臺後期會針對這個問題,增加 trace 日誌和 metrics 資訊,輸出到產品側以幫助使用者定位 Flink SQL 使用上的問題。2. 騰訊實時計算平臺目前的工作
■ 新增功能新增的一些功能,包括兩個新的 Window 的型別,Incremental Window(增量視窗)和 Ehanced Tumble Window(增強視窗)。實現了 Eventtime Field 與 Table S ource 的解耦,很多時候 Eventtime Field 並不能透過 Table Source 欄位定義出來,比如 Table So urce 是一個子查詢或者某個時間欄位是由函式轉換得出,想要用這些中間生成的時間欄位作為 Eventtime Field 目前是做不到的,我們目前的方案是,讓使用者可以選擇物理表中任意的時間欄位來定義 Window 的時間屬性並輸出 WaterMark。
■ 效能調優回撤流最佳化;內聯 UDF,如果相同的 UDF 既出現在 LogicalProject 中,又出現在 Where 條件中,那麼 UDF 會進行多次呼叫。將邏輯執行計劃中重複呼叫的 UDF 提取出來,將該 UDF 的執行結果進行快取,避免多次呼叫;■ Bucket Join
流表維表 Join 中存在資料冷啟動問題,如果 Flink 任務在啟動時大量載入外部資料,很容易造成反壓。 可以在啟動時利用 State Processor API 等手段將全部資料預載入到記憶體中。 但這種方案存在一種問題,維表資料載入到所有的 subtask 裡面會造成較大的記憶體消耗。 因此我們的解決方案是,在維表的定義中指定一個 bucket 資訊,流與維表進行 Join 的時候會基於 bucket 資訊去載入維表中對應分片的資料,同時在翻譯執行計劃的時候流表拿到 bucket 資訊,以保證流與維表的資料都會基於同一個 bucket 資訊進行 Join。 這種方式能大大減少全量維表資料預載入帶來的記憶體消耗問題。
二、 視窗功能擴充套件
騰訊實時計算平臺基於現有 Flink SQL 語法進行了一些擴充套件,並另外定義了兩種新的 Window 型別。
1. 新的視窗操作
現有如下需求,需要在兩條流上針對某個時間視窗做 Join 操作或者交併差操作。
使用 Flink SQL 基於某個 Window 去做雙流 Join,現有的方案有兩種,第一種方案就是先做 Join 再做 Group By,第二種就是 Interval Join。首先來分析一下第一種方案能否滿足需求。
先 Join 再開窗的邏輯如上圖所示,根據邏輯執行計劃可以看到 Join 節點在 Window Aggregate 節點之下,所以會先進行流與流的 Join,Join 完了之後再去做Window Aggregate。
圖中右側的流程圖也可以看出,首先兩邊的流會做一個 Connect,然後基於 Join Key 做 Keyby 操作,以此保證兩條流中擁有相同 Join Key 的資料能夠 Shuffle 到同一個 task 上。左流會將資料存到自己的狀態中,同時會去右流的狀態中進行 Match,如果能 Match 上會將 Match 後的結果輸出到下游。這種方案存在以下兩個問題:
狀態無法清理 :因為 Join 在開窗之前,Join 裡面並沒有帶 Window 的資訊,即使下游的 Window 觸發並完成計算,上游兩條流的 Join 狀態也無法被清理掉,頂多只能使用基於 TTL 的方式去清理。語義無法滿足需求 :原始的需求是想在兩條流中基於相同的時間視窗去把資料進行切片後再 Join,但是當前方案並不能滿足這樣的需求,因為它先做 Join,使用 Join 後的資料再進行開窗,這種方式不能確保兩條流中參與 Join 的資料是基於同一視窗的。■ 1.2 Interval Join
Interval Join 相對於前面一種寫法,好處就是不存在狀態無法清理的問題,因為在掃描左右兩條流的資料時可以基於某一確定的視窗,過了視窗時間後,狀態是可以被清理掉的。
但是這種方案相對於第一種方案而言,資料準確性可能會更差一點,因為它對於視窗的劃分不是基於一個確定視窗,而是基於資料進行驅動,即當前資料可以 Join 的另一條流上的資料的範圍是基於當前資料所攜帶的 Eventtime 的。這種視窗劃分的語義與我們的需求還是存在一定差距的。
想象一下現有兩條速率不一致的流,以 low 和 upper 兩條邊界來限定左流可以 Join 的右流的資料範圍,在如此死板的範圍約束下,右流總會存在一些有效資料落在時間視窗 [left + low, left + upper] 之外,導致計算不夠準確。因此,最好還是按照視窗對齊的方式來劃分時間視窗,讓兩條流中 Eventtime 相同的資料落在相同的時間視窗。
騰訊擴展出了 Windowing Table-Valued Function 語法,該語法可以滿足“在兩條流上針對某個時間視窗做 Join 操作或者交併差操作”的需求。在 SQL 2016 標準中就有關於這一語法的描述,同時該語法在 Calcite1.23 裡面就已存在。
Windowing Table-Valued Function 語法中的 Source 可以把它整個的語義描述清楚,From 子句裡面包含了 Window 定義所需要的所有資訊,包括 Table Source、Eventtime Field、Window Size 等等。
從上圖的邏輯計劃可以看出,該語法在 LogiclTableScan 上加了一個叫 LogicalTableFunctionScan 的節點。另外,LogicalProject 節點(輸出節點)多了兩個欄位叫作 WindowStart 和 WindowEnd,基於這兩個欄位可以把資料歸納到一個確定的視窗。基於以上原理,Windowing Table-Valued Function 語法可以做到下面這些事情:
在單流上面 ,可以像現有的 Group Window 語法一樣去劃分出一個時間視窗。寫法如上圖,Window 資訊全部放到 From 子句中,然後再進行 Group By。這種寫法應該更符合大眾對於時間視窗的理解,比當前 Flink SQL 中的 Group Window 的寫法更加直觀一點。我們在翻譯單流上的 Windowing Table-Valued Function 語法時做了一個討巧,即在實現這段 SQL 的物理翻譯時,並沒有去翻譯成具體的 DataStream API,而是將其邏輯執行計劃直接變換到現在的 Group Window 的邏輯執行計劃,也就是說共用了底層物理執行計劃的程式碼,只是做了一個邏輯執行計劃的等價。另外,可以對 Window 裡面的資料做一些 Sort 或者 TopN 的一些輸出,因為 Windowing Table-Valued Function 語法已經提前把資料劃分進了一個個確定的視窗。如上圖所示,首先在 From 子句裡面把視窗劃分好,然後 Order By 和 Limit 緊接其後,直接表達了排序和 TopN 語義。在雙流上面 ,可以滿足“在兩條流上針對某個時間視窗做 Join 操作或者交併差操作”的原始需求。語法如上圖,首先把兩個視窗的 Window Table 構造好,然後利用 Join 關鍵字進行 Join 操作即可;交併差操作也一樣,與傳統資料庫 SQL 的交併差操作無二。■ 1.4 實現細節
下面簡單介紹一下我們在實現 Windowing Table-Valued Function 語法時的一些細節。
1.4.1 視窗的傳播原始的邏輯計劃翻譯方式,先基於 LogicalTableScan,然後再翻譯到 Windowing Table-Valued Function,最後再翻譯到 OrderBy Limit 子句。整個過程會儲存很多次狀態,對於效能來講會是比較大的一個消耗,因此做了如下最佳化,把多個 Logical Relnode 合併在一起去翻譯,這樣可以減少中間環節程式碼的產生,從而提高效能。
1.4.2 時間屬性欄位可以看到 Windowing Table-Valued Function 的語法:
SELECT * FROM TABLE(TUMBLE(TABLE <data>, DESCRIPTOR(<timecol>), <size> [, <offset>]))
table<data> 不僅僅可以是一張表,還可以是一個子查詢。所以如果定義 Eventtime Field 的時候,把時間屬性和 Table Source 繫結,且 Table Source 恰好是一個子查詢,此時就無法滿足我們的需求。所以我們在實現語法的時候,把時間屬性欄位跟 Table Source 解耦,反之,使用者使用物理表中的任意一個時間欄位來作為時間屬性,從而產生 watermark。
1.4.3 時間水印Watermark 的使用邏輯與在其他語法中一樣,兩條流的所有的 Input Task 的最小時間水印,決定視窗的時間水印,以此來觸發視窗計算。
1.4.4 使用約束目前 Windowing Table-Valued Function 的使用存在一些約束。首先,兩條流的視窗型別必須是一致的,而且視窗大小也是一樣的。然後,目前還沒有實現 Session Window 相關的功能。
2. 新的視窗型別
接下來的介紹擴展出兩個新的視窗型別。
2.1.1 多次觸發基於 Tumble Window,自定義了 Incremental Trigger。該觸發器確保,不僅僅是在 Windows 結束之後才去觸發視窗計算,而是每個 SQL 中所定義的 Interval 週期都會觸發一次視窗計算。
如上圖中的 SQL 案例,總的視窗大小是一秒,且每 0.2 秒觸發一次,所以在視窗內會觸發 5 次視窗計算。且下一次的輸出結果是基於上一次結果進行累計計算。
2.1.2 Lazy Trigger針對 Incremental Window 做了一個名為 Lazy Trigger 的最佳化。在實際的生產過程中,一個視窗相同 Key 值在多次觸發視窗計算後輸出的結果是一樣的。對於下游來講,對於這種資料是沒必要去重複接收的。因此,如果配置了 Lazy Trigger 的話,且在同一個視窗的同一個 Key 下,下一次輸出的值跟上一次的是一模一樣的,下游就不會接收到這次的更新資料,由此減少下游的儲存壓力和併發壓力。
有如下需求,使用者希望在 Tumble Window 觸發之後,不去丟棄遲到的資料,而是再次觸發視窗計算。如果使用 DataStream API,使用 SideOutput 就可以完成需求。但是對於 SQL,目前是沒辦法做到的。因此,擴充套件了現有的 Tumble Window,把遲到的資料也收集起來,同時遲到的資料並不是每來一條就重新觸發視窗計算並向下遊輸出,而是會重新定義一個 Trigger,Trigger 的時間間隔使用 SQL 中定義的視窗大小,以此減少向下遊傳送資料的頻率。
同時,側輸出流在累計資料的時候也會使用 Window 的邏輯再做一次聚合。這裡需要注意,如果下游是類似於HBase這樣的資料來源,對於相同的 Window 相同的 Key,前一條正常被視窗觸發的資料會被遲到的資料覆蓋掉。理論上,遲到的資料跟正常視窗觸發的資料的重要性是一樣的,不能相互覆蓋。最後,下游會將收到的同一個視窗同一個 Key 下的正常資料和延遲資料再做一次二次聚合。
三、回撤流最佳化接下來介紹一下在回撤流上所做的一些最佳化。
1. 流表二義性
回顧一下關於在 Flink SQL 中關於回撤流的一些概念。
首先介紹一下持續查詢(Continuous Query),相對於批處理一次執行輸出一次結果的特點,流的聚合是上游來一條資料,下游的話就會接收一條更新的資料,即結果是不斷被上游的資料所更新的。因此,對於同一個 Key 下游能夠接收到多條更新結果。
2. 回撤流
以上圖的 SQL 為例,當第二條 Java 到達聚合運算元時,會去更新第一條 Java 所產生的狀態並把結果傳送到下游。如果下游對於多次更新的結果不做任何處理,就會產生錯誤的結果。針對這種場景,Flink SQL 引入了回撤流的概念。
所謂回撤流的話,就是在原始資料前加了一個標識位,以 True/False 進行標識。如果標識位是 False,就表示這是一條回撤訊息,它通知下游對這條資料做 Delete 操作;如果標識位是 True,下游直接會做 Insert 操作。
Aggregate Without Window(不帶 Window 的聚合場景)RankOver WindowLeft/Right/Full Outer Join解釋一下 Outer Join 為什麼會產生回撤。以 Left Outer Join 為例,且假設左流的資料比右流的資料先到,左流的資料會去掃描右流資料的狀態,如果找不到可以 Join 的資料,左流並不知道右流中是確實不存在這條資料還是說右流中的相應資料遲到了。為了滿足 Outer join 的語義的話,左邊流資料還是會產生一條 Join 資料傳送到下游,類似於 MySQL Left Join,左流的欄位以正常的表字段值填充,右流的相應欄位以 Null 填充,然後輸出到下游,如下圖所示:
後期如果右流的相應資料到達,會去掃描左流的狀態再次進行 Join,此時,為了保證語義的正確性,需要把前面已經輸出到下游的這條特殊的資料進行回撤,同時會把最新 Join 上的資料輸出到下游。注意,對於相同的 Key,如果產生了一次回撤,是不會再產生第二次回撤的,因為如果後期再有該 Key 的資料到達,是可以 Join 上另一條流上相應的資料的。
下面介紹 Flink 中處理回撤訊息的邏輯。
對於中間計算節點,透過上圖中的 4 個標誌位來控制,這些標識位表示當前節點是產生 Update 資訊還是產生 Retract 資訊,以及當前節點是否會消費這個 Retract 資訊。這 4 個標識位能夠決定整個關於 Retract 的產生和處理的邏輯。
對於 Sink 節點,目前 Flink 中有三種 sink 型別,AppendStreamTableSink、RetractStreamTableSink 和 UpsertStreamTableSink。AppendStreamTableSink 接收的上游資料是一條 Retract 資訊的話會直接報錯的,因為它只能描述 Append-Only 語義;RetractStreamTableSink 則可以處理 Retract 資訊,如果上游運算元傳送一個 Retract 資訊過來,它會對訊息做 Delete 操作,如果上游運算元傳送的是正常的更新資訊,它會對訊息做 Insert 操作;UpsertStreamTableSink 可以理解為對於RetractStreamTableSink 做了一些效能的最佳化。如果 Sink 資料來源支援冪等操作,或者支援按照某 key 做 Update 操作,UpsertStreamTableSink 會在 SQL 翻譯的時候把上游 Upsert Key 傳到 Table Sink 裡面,然後基於該 Key 去做 Update 操作。
2.3.1 中間節點的最佳化產生回撤資訊最根本的一個原因是不斷地向下遊多次傳送更新結果,因此,為了減少更新的頻率並降低併發,可以把更新結果累計一部分之後再發送出去。如上圖所示:
第一個場景是一個巢狀 AGG 的場景(例如兩次 Count操作),在第一層 Group By 嘗試將更新結果傳送到下游時候會先做一個 Cache,從而減少向下遊傳送資料頻率。當達到了 Cache 的觸發條件時,再把更新結果傳送到下游。第二個場景是 Outer Join,前面提到,Outer Join 產生回撤訊息是因為左右兩邊資料的速率不匹配。以 Left Outer Join 為例,可以把左流的資料進行 Cache。左流資料到達時會去右流的狀態裡面查詢,如果能找到可以與之 Join的資料則不作快取;如果找不到相應資料,則對這條 Key 的資料先做快取,當到達某些觸發條件時,再去右流狀態中查詢一次,如果仍然找不到相應資料,再去向下游傳送一條包含 Null 值的 Join 資料,之後右流相應資料到達就會將 Cache 中該 Key 對應的快取清空,並向下遊傳送一條回撤訊息。以此來減小向下遊傳送回撤訊息的頻率。
2.3.2 Sink 節點的最佳化針對 Sink 節點做了一些最佳化,在 AGG 節點和 Sink 節點之間做了一個 Cache,以此減輕 Sink 節點的壓力。當回撤訊息在 Cache 中再做聚合,當達到 Cache 的觸發條件時,統一將更新後的資料傳送到 Sink 節點。以下圖中的 SQL 為例:
參考最佳化前後的輸出結果可以看到,最佳化後下遊接收到的資料量是有減少的,例如使用者 Sam,當回撤訊息嘗試傳送到下游時,先做一層 Cache,下游接收到的資料量可以減少很多。
四、未來規劃下面介紹一下我們團隊後續的工作規劃:
Cost-Based Optimization :現在 Flink SQL 的邏輯執行計劃的最佳化還是基於RBO(Rule Based Optimization)的方式。我們團隊想基於 CBO 所做一些事,主要的工作還是統計資訊的收集。統計資訊不僅僅來自 Flink SQL 本身,可能還會來自公司內其他產品,例如元資料,不同 Key 所對應的資料分佈,或者其他資料分析結果。透過跟公司內其他產品打通,拿到最準的統計資料,產生最優的執行計劃。More New Features(CEP Syntax etc.) : 基於 Flink SQL 定義一些 CEP 的語法,以滿足使用者關於 CEP 的一些需求。Continuous Performance Optimization(Join Operator etc.) : 我們團隊在做的不僅僅是執行計劃層的最佳化,也在做 Join Operator 或者說資料 Shuffle 的一些細粒度的最佳化。Easier To Debug : 最後是關於 Flink SQL任務的除錯和定位。 目前 Flink SQL在這方面是比較欠缺的,特別是線上關於資料對不齊的問題,排查起來非常的棘手。 我們目前的思路是透過配置的方式,讓 SQL 在執行的過程中吐出一些 Trace 資訊或者一些 Metrics 資訊,然後傳送到其他平臺。 透過這些 Trace 資訊和 Metric 資訊,幫助使用者定位出問題的運算元。作者:杜立@騰訊
出處:https://mp.weixin.qq.com/s/9Y6XCZRuZ79ICJ5ylTYRDQ