集團關於Blink的相關使用文件已經十分齊全,這裡不準備再過多贅述。這篇文章準備對Blink所基於的Apache社群開源產品--Flink的架構做一些淺顯分析。
一:Flink歷史、基本架構及分散式部署歷史
Flink專案最早開始於2010年由柏林技術大學、柏林洪堡大學、哈索普拉特納研究所共同合作研發的"Stratosphere: Information Management on the Cloud"(平流層:雲上的資訊管理) 專案,Flink最開始是作為該專案一個分散式執行引擎的Fork,到2014年成為Apache基金會下的一個專案,2014年底成為Apache頂級專案。每年一次的Flink Forward是關於Apache Flink最盛大的年度會議。
基本架構
Flink是原生的流處理系統,提供high level的API。Flink也提供 API來像Spark一樣進行批處理,但兩者處理的基礎是完全不同的。Flink把批處理當作流處理中的一種特殊情況。在Flink中,所有的資料都看作流,是一種很好的抽象,因為這更接近於現實世界。
Flink的基本架構圖
Flink 的主要架構與Spark接近,都基於Master-Slave 的主從模式,從執行順序上講:
1:叢集啟動,啟動JobManager 和多個TaskManager;
2:Flink Program程式提交程式碼,經由最佳化器/任務圖生成器,生成實際需執行的Job,傳遞至Client;
3:Client將submit提交任務(本質上是傳送包含了任務資訊的資料流)至JobManager;
4:JobManager分發任務到各個真正執行計算任務的Worker----TaskManager;
5:TaskManager開始執行計算任務,並且定時彙報心跳資訊和統計資訊給JobManager,TaskManager之間則以流的形式進行資料傳輸;
在以上步驟中,步驟2與Flink叢集之間可以不存在歸屬關係,即我們可以在任何機器上提交作業,只要它與JobManager相通。Job提交之後,Client甚至可以直接結束程序,都不會影響任務在分散式叢集的執行。
Client:
當用戶提交一個Flink程式時,會首先建立一個Client,該Client首先會對使用者提交的Flink程式進行預處理,並提交到Flink叢集中處理,所以Client需要從使用者提交的Flink程式配置中獲取JobManager的地址,並建立到JobManager的連線,將Flink Job提交給JobManager。Client會將使用者提交的Flink程式組裝一個JobGraph, 並且是以JobGraph的形式提交的。一個JobGraph是一個Flink Dataflow,它由多個JobVertex組成的DAG。所以,一個JobGraph包含了一個Flink程式的如下資訊:JobID、Job名稱、配置資訊、一組JobVertex(實際的任務operators)等。
JobManager:
JobManager是Flink系統的協調者,它負責接收Flink Job,排程組成Job的多個Task的執行。同時,JobManager還負責收集Job的狀態資訊,並管理Flink叢集中從節點TaskManager。主要包括:
RegisterTaskManager——在Flink叢集啟動的時候,TaskManager會向JobManager註冊,如果註冊成功,則JobManager會向TaskManager回覆訊息AcknowledgeRegistration;
SubmitJob——Flink程式內部透過Client向JobManager提交Flink Job,其中在訊息SubmitJob中以JobGraph形式描述了Job的基本資訊;
CancelJob——請求取消一個Flink Job的執行,CancelJob訊息中包含了Job的ID,如果成功則返回訊息CancellationSuccess,失敗則返回訊息CancellationFailure;
UpdateTaskExecutionState——TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態資訊,即向JobManager彙報operator具體的執行狀態,更新成功則返回true;
其他還包括RequestNextInputSplit、JobStatusChanged;
TaskManager:
TaskManager也是一個Actor(掌管者),它是實際負責執行計算的Worker,在其上執行Flink Job的一組Task。它在啟動的時候就設定好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為執行緒。TaskManager從 JobManager 處接收需要部署的 Task,部署啟動後,與自己的上游(任務上存在依賴關係的上游處理節點)建立 Netty 連線,接收資料並處理。每個TaskManager負責管理其所在節點上的資源資訊,如記憶體、磁碟、網路,在啟動的時候將資源的狀態向JobManager彙報。
TaskManager端可以分成兩個階段:
註冊階段——TaskManager會向JobManager註冊,傳送RegisterTaskManager訊息,等待JobManager返回AcknowledgeRegistration,然後TaskManager就可以進行初始化過程;
可操作階段——該階段TaskManager可以接收並處理與Task有關的訊息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連線到JobManager,這是TaskManager就失去了與JobManager的聯絡,會自動進入“註冊階段”,只有完成註冊才能繼續處理Task相關的訊息。
基於Yarn層面的結構
1: Clinet 客戶端上傳包含Flink和HDFS配置的jars至HDFS,因為YARN客戶端需要訪問Hadoop的配置以連線YARN資源管理器和HDFS;2: Clinet客戶端請求一個YARN容器作為資源管理器-Resource Manager,作用是啟動ApplicationMaster;
3: RM分配第一個container去執行AM--AppplicationMaster;
4: AM啟動,開始負責資源的監督和管理;
5: Job Manager和AM執行在同一個容器裡,都成功啟動後,AM知道job管理器(它擁有的主機)的地址;
6: Job Manager為Task Manager生成一個新的Flink配置, 這樣task可連線Job Manager;
7: AM容器可以作為Flink的web介面服務,YARN程式碼的所有埠是分配的臨時埠, 這可讓使用者並行執行多個yarn會話;
8: AM啟動分配到的容器,這些容器作為Flink的Task Manager,將會從HDFS下載jar和更新配置,叢集Run,可接收Job;
Flink叢集的HA方案:
在Flink的基本架構圖中,我們發現這一Master-Slave模式存在單點問題,即:JobManager這個點萬一down掉,整個叢集也就全完了。Flink一共提供了三種部署模式:Local、Standalone、YARN,除第一種為本地單機模式外,後兩者都為叢集模式。對於Standalone和YARN,Flink提供了HA機制避免上述單點失敗問題,使得叢集能夠從失敗中恢復。
YARN模式:
上段中介紹到Yarn層面的機構,注意到Flink的JobManager與YARN的Application Master(簡稱AM)是在同一個程序下的。YARN的ResourceManager對AM有監控,當AM異常時,YARN會將AM重新啟動,啟動後,所有JobManager的元資料從HDFS恢復。但恢復期間,舊的業務不能執行,新的業務不能提交。ZooKeeper(Apache ZooKeeper™)上還是存有JobManager的元資料,比如執行Job的資訊,會提供給新的JobManager使用。對於TaskManager的失敗,由JobManager上Akka的DeathWatch機制監聽處理。當TaskManager失敗後,重新向YARN申請容器,建立TaskManager。
Standalone模式:
對於Standalone模式的叢集,可以啟動多個JobManager,然後透過ZooKeeper選舉出leader作為實際使用的JobManager。該模式下可以配置一個主JobManager(Leader JobManager)和多個備JobManager(Standby JobManager),這能夠保證當主JobManager失敗後,備的某個JobManager可以承擔主的職責。下圖為主備JobManager的恢復過程。
二:Flink的流式計算架構分層棧
Deployment層:
本地、叢集,以及商用的雲模式,不再贅述;
runtime層:
Runtime層提供了支援Flink計算的全部核心實現,比如:支援分散式Stream處理、JobGraph到ExecutionGraph的對映、排程等等,為上層API層提供基礎服務;
API層:
API層主要實現了面向無界Stream的流處理和麵向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API. 簡單來說,DataSet和DataStream都是包含了重複項資料的immutable集合,不同的是,在DataSet裡,資料是有限的,而對於DataStream,元素的數量可以是無限的。對程式而言,最初的資料集合來源是Flink program 中的源資料,如雙11支付資料大屏的線上實時資料來源;然後透過filter、map、flatmap等API,可以對它們進行轉換,從而由初始資料集合派生出新集合。注意,集合是immutable的,只可派生出新的,不能修改原有的;
Libraries層:
Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理和麵向批處理兩類。面向流處理支援:CEP(複雜事件處理)、基於SQL-like的操作(基於Table的關係操作);面向批處理支援:FlinkML(機器學習庫)、Gelly(圖處理)。
三:特性分析高吞吐&低延遲
簡單來說,Flink在流式計算上相比於Spark Streaming & Storm,突出的優勢主要是高吞吐&低延遲,如下圖所示:
支援 Event Time 和亂序事件
Event time :Event time是事件發生的時間,經常以時間戳表示,並和資料一起傳送。帶時間戳的資料流有,Web服務日誌、監控agent的日誌、移動端日誌等;Processing time :Processing time是處理事件資料的伺服器時間,一般是執行流處理應用的伺服器時鐘。許多流處理場景中,事件發生的時間和事件到達待處理的訊息佇列時間有各種延遲:
各種網路延遲;資料流消費者導致的佇列阻塞和反壓影響;資料流毛刺,即,資料波動;事件生產者(移動裝置、感測器等)離線;上述諸多原因會導致佇列中的訊息頻繁亂序。事件發生的時間和事件到達待處理的訊息佇列時間的不同隨著時間在不斷變化,這常被稱為時間偏移(event time skew),表示成:“processing time – event time”。
對大部分應用來講,基於事件的建立時間分析資料比基於事件的處理時間分析資料要更有意義。Flink允許使用者定義基於事件時間(event time)的視窗,而不是處理時間。
Flink使用事件時間 clock來跟蹤事件時間,其是以watermarks來實現的。watermarks是Flink 源流基於事件時間點生成的特殊事件。 T 時間點的watermarks意味著,小於 T 的時間戳的事件不會再到達。Flink的所有操作都基於watermarks來跟蹤事件時間。
狀態計算的exactly-once和容錯機制
流程式可以在計算過程中維護自定義狀態。
Apache Flink 提供了可以恢復資料流應用到一致狀態的容錯機制。確保在發生故障時,程式的每條記錄只會作用於狀態一次(exactly-once),不過也可以降級為至少一次(at-least-once)。這一容錯機制透過持續建立分散式資料流的快照來實現。對於狀態佔用空間小的流應用,這些快照非常輕量,可以高頻率建立而對效能影響很小。流計算應用的狀態儲存在一個可配置的環境,如:master 節點或者 HDFS上。
在遇到程式故障時(如機器、網路、軟體等故障),Flink 停止分散式資料流。系統重啟所有 operator ,重置其到最近成功的 checkpoint。輸入重置到相應的狀態快照位置。保證被重啟的並行資料流中處理的任何一個 record 都不是 checkpoint 狀態之前的一部分。
為了能保證容錯機制生效,資料來源(例如訊息佇列或者broker)需要能重放資料流。Apache Kafka 有這個特性,Flink 中 Kafka 的 connector 利用了這個功能。集團的TT系統也有同樣功能。
Flink 分散式快照的核心概念之一就是資料柵欄(barrier)。如上圖所示,這些 barrier 被插入到資料流中,作為資料流的一部分和資料一起向下流動。Barrier 不會干擾正常資料,資料流嚴格有序。一個 barrier 把資料流分割成兩部分:一部分進入到當前快照,另一部分進入下一個快照。每一個 barrier 都帶有快照 ID,並且 barrier 之前的資料都進入了此快照。Barrier 不會干擾資料流處理,所以非常輕量。多個不同快照的多個 barrier 會在流中同時出現,即多個快照可能同時建立。
Barrier 在資料來源端插入,當 snapshot N 的 barrier 插入後,系統會記錄當前 snapshot 位置值N (用Sn表示)。例如,在 Apache Kafka 中,這個變量表示某個分割槽中最後一條資料的偏移量。這個位置值 Sn 會被髮送到一個稱為 Checkpoint Coordinator 的模組(即 Flink 的 JobManager).
然後 barrier 繼續往下流動,當一個 operator 從其輸入流接收到所有標識 snapshot N 的 barrier 時,它會向其所有輸出流插入一個標識 snapshot N 的 barrier。當 sink operator(DAG 流的終點)從其輸入流接收到所有 barrier N 時,它向Checkpoint Coordinator 確認 snapshot N 已完成。當所有 sink 都確認了這個快照,快照就被標識為完成。
高度靈活的流式視窗Window
Flink支援在時間視窗,統計視窗,session 視窗,以及資料驅動的視窗,視窗(Window)可以透過靈活的觸發條件來定製,以支援複雜的流計算模式。
來自雲邪的描述 ——:“在流處理應用中,資料是連續不斷的,因此我們不可能等到所有資料都到了才開始處理。當然我們可以每來一個訊息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少使用者點選了我們的網頁。在這種情況下,我們必須定義一個視窗,用來收集最近一分鐘內的資料,並對這個視窗內的資料進行計算。”
視窗可以是時間驅動的(Time Window,例如:每30秒鐘),也可以是資料驅動的(Count Window,例如:每一百個元素)。一種經典的視窗分類可以分成:翻滾視窗(Tumbling Window),滾動視窗(Sliding Window),和會話視窗(Session Window)。
帶反壓(BackPressure)的連續流模型
資料流應用執行的是不間斷的(常駐)operators。
Flink streaming 在執行時有著天然的流控:慢的資料 sink 節點會反壓(backpressure)快的資料來源(sources)。
反壓通常產生於這樣的場景:短時負載高峰導致系統接收資料的速率遠高於它處理資料的速率。許多日常問題都會導致反壓,例如,垃圾回收停頓可能會導致流入的資料快速堆積,或者遇到大促或秒殺活動導致流量陡增。反壓如果不能得到正確的處理,可能會導致資源耗盡甚至系統崩潰。
Flink的反壓:
如果你看到一個task的back pressure告警(比如,high),這意味著生產資料比下游操作運算元消費的速度快。Record的在你工作流的傳輸方向是向下遊,比如從source到sink,而back pressure正好是沿著反方向,往上游傳播。
舉個簡單的例子,一個工作流,只有source到sink兩個步驟。假如你看到source端有個告警,這意味著sink消費資料速率慢於生產者的生產資料速率。Sink正在向上遊進行back pressure。
絕妙的是,在Spark Streaming和Storm是棘手問題的BackPressure,在Flink中並不成問題。簡單來說,Flink無需進行反壓,因為系統接收資料的速率和處理資料的速率是自然匹配的。系統接收資料的前提是接收資料的Task必須有空閒可用的Buffer,該資料被繼續處理的前提是下游Task也有空閒可用的Buffer。因此,不存在系統接受了過多的資料,導致超過了系統處理的能力。這有點像Java執行緒中的通用阻塞佇列: 一個較慢的接受者會降低傳送者的傳送速率,因為一旦佇列滿了(有界佇列)傳送者會被阻塞。