一、什麼是 checkpoint
上次發文,提到了 Flink 可以非常高效的進行有狀態流的計算,透過使用 Flink 內建的 Keyed State 和 Operator State,儲存每個運算元的狀態。
預設情況下,狀態是儲存在 JVM 的堆記憶體中,如果系統中某個環節發生了錯誤,宕機,這個時候所有的狀態都會丟失,並且無法恢復,會導致整個系統的資料計算發生錯誤。
此時就需要 Checkpoint 來保障系統的容錯。Checkpoint 過程,就是把運算元的狀態週期性持久化的過程。
在系統出錯後恢復時,就可以從 checkpoint 中恢復每個運算元的狀態,從上次消費的地方重新開始消費和計算。從而可以做到在高效進行計算的同時還可以保證資料不丟失,只計算一次。
二、Checkpoint 必要的兩個條件答案是否,需要滿足以下兩個條件才能做 Checkpoint:
需要支援重放一定時間範圍內資料的資料來源,比如:kafka 。因為容錯機制就是在任務失敗後自動從最近一次成功的 checkpoint 處恢復任務,此時需要把任務失敗前消費的資料再消費一遍。假設資料來源不支援重放,那麼資料還未寫到儲存中就丟了,任務恢復後,就再也無法重新消費這部分丟了的資料了。需要一個儲存來儲存持久化的狀態,如:Hdfs,本地檔案。可以在任務失敗後,從儲存中恢復 checkpoint 資料。三、Checkpoint 引數詳解StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每 60s 做一次 checkpointenv.enableCheckpointing(60000);// 高階配置:// checkpoint 語義設定為 EXACTLY_ONCE,這是預設語義env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 兩次 checkpoint 的間隔時間至少為 1 s,預設是 0,立即進行下一次 checkpointenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);// checkpoint 必須在 60s 內結束,否則被丟棄,預設是 10 分鐘env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一時間只能允許有一個 checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 最多允許 checkpoint 失敗 3 次env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 當 Flink 任務取消時,保留外部儲存的 checkpoint 資訊env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 當有較新的 Savepoint 時,作業也會從 Checkpoint 處恢復env.getCheckpointConfig().setPreferCheckpointForRecovery(true);// 允許實驗性的功能:非對齊的 checkpoint,以提升效能env.getCheckpointConfig().enableUnalignedCheckpoints();
相關引數的文字描述:
env.enableCheckpointing(60000),1 分鐘觸發一次 checkpoint;setCheckpointTimeout,checkpoint 超時時間,預設是 10 分鐘超時,超過了超時時間就會被丟棄;setCheckpointingMode,設定 checkpoint 語義,可以設定為 EXACTLY_ONCE,表示既不重複消費也不丟資料;AT_LEAST_ONCE,表示至少消費一次,可能會重複消費;setMinPauseBetweenCheckpoints,兩次 checkpoint 之間的間隔時間。假如設定每分鐘進行一次 checkpoint,兩次 checkpoint 間隔時間為 30s。假設某一次 checkpoint 耗時 40s,那麼理論上20s 後就要進行一次 checkpoint,但是設定了兩次 checkpoint 之間的間隔時間為 30s,所以是 30s 之後才會進行 checkpoint。另外,如果配置了該引數,那麼同時進行的 checkpoint 數量只能為 1;enableExternalizedCheckpoints,Flink 任務取消後,外部 checkpoint 資訊是否被清理。DELETE_ON_CANCELLATION,任務取消後,所有的 checkpoint 都將會被清理。只有在任務失敗後,才會被保留;RETAIN_ON_CANCELLATION,任務取消後,所有的 checkpoint 都將會被保留,需要手工清理。setPreferCheckpointForRecovery,恢復任務時,是否從最近一個比較新的 savepoint 處恢復,預設是 false;enableUnalignedCheckpoints,是否開啟試驗性的非對齊的 checkpoint,可以在反壓情況下極大減少 checkpoint 的次數;四、Checkpoint 如何實現的Flink 的 checkpoint 是基於 Chandy-Lamport 演算法,實現了一個分散式一致性的儲存快照演算法。
這裡我們假設一個簡單的場景來描述 checkpoint 具體過程是怎樣的。
場景是:假如現在 kafka 只有一個分割槽,資料是每個 app 發過來的日誌,我們統計每個 app 的 PV。
Flink 的 checkpoint coordinator (JobManager 的一部分)會週期性的在流事件中插入一個 barrier 事件(柵欄),用來隔離不同批次的事件,如下圖紅色的部分。
下圖中有兩個 barrier ,checkpoint barrier n-1 處的 barrier 是指 Job 從開始處理到 barrier n -1 所有的狀態資料,checkpoint barrier n 處的 barrier 是指 Job 從開始處理到 barrier n 所有的狀態資料。
回到剛剛計算 PV 的場景,當 Source Task 接受到 JobManager 的編號為 chk-100 的 Checkpoint 觸發請求後,發現自己恰好接收到了 offset 為(0,1000)【表示分割槽0,offset 為1000】處的資料,所以會往 offset 為(0,1000)資料之後,(0,1001)資料之前安插一個 barrier,然後自己開始做快照,把 offset (0,1000)儲存到狀態後端中,向 CheckpointCoordinator報告自己快照製作情況,同時向自身所有下游運算元廣播該barrier。如下圖:
當下遊計算的運算元收到 barrier 後,會看是否收到了所有輸入流的 barrier,我們現在只有一個分割槽,Source 運算元只有一個例項,barrier 到了就是收到了所有的輸入流的 barrier。
開始把本次的計算結果(app1,1000),(app2,5000)寫到狀態儲存之中,向 CheckpointCoordinator 報告自己快照製作情況,同時向自身所有下游運算元廣播該barrier。
當 Operator 2 收到柵欄後,會觸發自身進行快照,把自己當時的狀態儲存下來,向 CheckpointCoordinator 報告 自己快照製作情況。因為這是一個 sink ,狀態儲存成功後,意味著本次 checkpoint 也成功了。
Barrier 對齊上面我們舉的例子是 Source Task 例項只有一個的情況,在輸入流的運算元有多個例項的情況下,會有一個概念叫 Barrier 對齊。
可以看上面的第一張圖,有兩個輸入流,一個是上面的數字流,一個是下面的字母流。
數字流的 barrier 在 1 後面,字母流的 barrier 在 e 後面。當上面的 barrier 到達 operator 之後,必須要等待下面的數字流的 barrier 也到達,此時數字流後面過來的資料會被快取到緩衝區。這就是 barrier 對齊的過程。
看上面的第二張圖,當數字流的 barrier 到達後,意味著輸入流的所有例項的 barrier 都到達了,此時開始處理 到第三張圖的時候,處理完畢,自身做快照,然後把緩衝區的 pending 資料都發出去,把 checkpoint barrier n 繼續往下發送。
五、Flink 1.11 對 Checkpoint 的最佳化從上圖的對齊過程,我們可以發現,在進行對齊的過程中,運算元是不會再接著處理資料了,一定要等到對齊動作完成之後,才能繼續對齊。也就是上圖中的數字流的 barrier 到達之後,需要去等待字母流的 barrier 事件。
這其中會有一個阻塞的過程。在大多數情況下執行良好,然而當作業出現反壓時,阻塞式的 Barrier 對齊反而會加劇作業的反壓,甚至導致作業不穩定。
首先, Chandy-Lamport 分散式快照的結束依賴於 Marker 的流動,而反壓則會限制 Marker 的流動,導致快照的完成時間變長甚至超時。無論是哪種情況,都會導致 Checkpoint 的時間點落後於實際資料流較多。
這時作業的計算進度是沒有被持久化的,處於一個比較脆弱的狀態,如果作業出於異常被動重啟或者被使用者主動重啟,作業會回滾丟失一定的進度。如果 Checkpoint 連續超時且沒有很好的監控,回滾丟失的進度可能高達一天以上,對於實時業務這通常是不可接受的。更糟糕的是,回滾後的作業落後的 Lag 更大,通常帶來更大的反壓,形成一個惡性迴圈。
所以在 Flink 1.11 版本中,引入了一個 Unaligned Checkpointing 的模組,主要功能是,在 barrier 到達之後,不必等待所有的輸入流的 barrier,而是繼續處理資料。
然後把第一次到達的 barrier 之後的所有資料也放到 checkpoint 裡面,在下一次計算的時候,會合並上次儲存的資料以及流入的資料後再計算。這樣會大大加快 Barrier 流動的速度,降低 checkpoint 整體的時長。
六、總結 Checkpoint 的原理JobManager 端的 CheckPointCoordinator 會定期向所有 SourceTask 傳送 CheckPointTrigger,Source Task 會在資料流中安插 Checkpoint barrier;當 task 收到上游所有例項的 barrier 後,向自己的下游繼續傳遞 barrier,然後自身同步進行快照,並將自己的狀態非同步寫入到持久化儲存中如果是增量 Checkpoint,則只是把最新的一部分更新寫入到外部持久化儲存中為了下游儘快進行 Checkpoint,所以 task 會先發送 barrier 到下游,自身再同步進行快照;當 task 將狀態資訊完成備份後,會將備份資料的地址(state handle)通知給 JobManager 的CheckPointCoordinator,如果 Checkpoint 的持續時長超過了 Checkpoint 設定的超時時間CheckPointCoordinator 還沒有收集完所有的 State Handle,CheckPointCoordinator 就會認為本次 Checkpoint 失敗,會把這次 Checkpoint 產生的所有狀態資料全部刪除;如果 CheckPointCoordinator 收集完所有運算元的 State Handle,CheckPointCoordinator 會把整個 StateHandle 封裝成 completed Checkpoint Meta,寫入到外部儲存中,Checkpoint 結束;作者:wangkai
出處:https://mp.weixin.qq.com/s/1u8XLMCIGhw-gWyO4KfhQQ