flink設定watermark以及事件時間欄位原始碼分析背景
1.1、提取時間戳欄位,用於事件事件語義處理資料
1.2、設定水位線(水印)watermark
TimestampAssigner 核心介面介紹TimestampAssigner 時間分配器介面 實現類關係圖:
提取時間戳欄位方法:
TimestampAssigner 時間戳分配器, 提取資料流中的時間戳欄位,
AssignerWithPeriodicWatermarks //週期性的生成水印AssignerWithPunctuatedWatermarks //打斷式的生成,也就是可以每一條資料都生成BoundedOutOfOrdernessTimestampExtractor //亂序資料週期性生成AscendingTimestampExtractor //升序資料週期性生成IngestionTimeExtractor //進入flink系統時間分配器
TimestampAssigner 實現類
AssignerWithPeriodicWatermarks //週期性的生成水印AssignerWithPunctuatedWatermarks //打斷式的生成,也就是可以每一條資料都生成BoundedOutOfOrdernessTimestampExtractor //亂序資料週期性生成AscendingTimestampExtractor //升序資料週期性生成IngestionTimeExtractor //進入flink系統時間分配器
設定時間戳、水印方法
DataStream類設定時間戳的方法:assignTimestampsAndWatermarks,指定watermark
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { // match parallelism to input, otherwise dop=1 sources could lead to some strange // behaviour: the watermark will creep along very slowly because the elements // from the source go to each extraction operator round robin. final int inputParallelism = getTransformation().getParallelism(); final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner); TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner); return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) .setParallelism(inputParallelism); }
1、AssignerWithPeriodicWatermarks介面:public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> { @Nullable Watermark getCurrentWatermark();}
官方實現類:
BoundedOutOfOrdernessTimestampExtractor //亂序資料週期性生成 AscendingTimestampExtractor //升序資料週期性生成 IngestionTimeExtractor //進入flink系統時間分配器
因此我們一般選擇使用實現類即可
2、AscendingTimestampExtractor 週期性生成watermark,升序資料//實現類提取時間戳欄位方法,呼叫者實現
public abstract long extractAscendingTimestamp(T element);
//根據資料流時間戳,計算watermark的時間戳 --升序處理資料
@Override public final long extractTimestamp(T element, long elementPrevTimestamp) { //資料流中獲取的時間戳 final long newTimestamp = extractAscendingTimestamp(element); //如果當前資料的時俱戳大於當前已知的時間戳中的,則更新watermark中的時間戳 if (newTimestamp >= this.currentTimestamp) { this.currentTimestamp = newTimestamp; return newTimestamp; } else { //否則列印日誌 violationHandler.handleViolation(newTimestamp, this.currentTimestamp); return newTimestamp; } }
列印日誌處理方法:
public static final class LoggingHandler implements MonotonyViolationHandler { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class); @Override public void handleViolation(long elementTimestamp, long lastTimestamp) { LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp); } }
獲取當前watermark的方法
@Override public final Watermark getCurrentWatermark() { //預設延遲1毫秒 //如果當前時間戳等於Long.MIN_VALUE 則返回Long.MIN_VALUE,否則返回最大時間戳-1 return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1); }
AscendingTimestampExtractor 繼承 AscendingTimestampExtractor
@PublicEvolving@Deprecatedpublic abstract class AscendingTimestampExtractor<T> extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {}
3、BoundedOutOfOrdernessTimestampExtractor 週期性的亂序資料
1、在建立物件時,預設給了一個最大的時間戳, Long.MIN_VALUE + this.maxOutOfOrderness;
2、來一條資料,判斷當前時間戳和最大時間戳的大小,如果當前時間戳大於最大時間戳,則更新
3、生成watermark,用最大時間戳減去最大延遲,也就是watermark中的時間戳調慢的時間,比如原本是3點結束的視窗,延遲為1分鐘,那麼watermark中的時間應該為2分59秒
4、預設是 Long.MIN_VALUE是防止出現最大的時間戳減去最大延遲為負數,watermark中的時間戳為負數,出現時間倒轉
BoundedOutOfOrdernessTimestampExtractor 有參建構函式:
/** The current maximum timestamp seen so far. */ private long currentMaxTimestamp; //截至目前最大的時間戳 /** The timestamp of the last emitted watermark. */ private long lastEmittedWatermark = Long.MIN_VALUE; //上次watermark中時間戳 /** * The (fixed) interval between the maximum seen timestamp seen in the records * and that of the watermark to be emitted. */ private final long maxOutOfOrderness; //最大延遲時間 //建構函式 maxOutOfOrderness為亂序可容忍的最大程度,單位可以為milliseconds seconds minutes等等 public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) { //如果延遲時間小於0,丟擲異常 if (maxOutOfOrderness.toMilliseconds() < 0) { throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative."); } //最大延遲轉換為毫秒數 this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds(); //計算最大的預設的時間戳 防止資料溢位,這裡要要加上最大延遲 this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; }
實現類重寫提取時間戳欄位的方法: 呼叫者使用,提取出指定欄位的資料並返回當前時間戳的大小
public abstract long extractTimestamp(T element);
extractTimestamp過載方法,用於更新最大的時間戳,每來一條資料進行判斷
@Override public final long extractTimestamp(T element, long previousElementTimestamp) { //獲取當前資料的時間戳大小 long timestamp = extractTimestamp(element); //如果當前資料的時間戳大小大於目前最大的時間戳,則賦值 if (timestamp > currentMaxTimestamp) { currentMaxTimestamp = timestamp; } //如果當前資料的時間戳小於目前最大的時間戳,則不變 return timestamp; }
獲取watermark中的時間戳:
@Overridepublic final Watermark getCurrentWatermark() { // this guarantees that the watermark never goes backwards. //當前時間的最大時間戳 - 最大延遲時間 =watermark中的時間戳 long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 如果當前的最大時間戳延遲後的時間戳大於上次的watermark中的時間戳,則更新watermark if (potentialWM >= lastEmittedWatermark) { lastEmittedWatermark = potentialWM; } return new Watermark(lastEmittedWatermark);}
4、介面AssignerWithPunctuatedWatermarks每一條資料都生成watermark的介面
public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> { @Nullable Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);}
沒有實現類,需我們自己實現
public static class Test implements AssignerWithPunctuatedWatermarks { /** * 生成Watermark * * @param lastElement 上一條資料 * @param extractedTimestamp 水印的時間戳 * @return */ @Nullable @Override public Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) { return null; } //提取時間戳欄位 @Override public long extractTimestamp(Object element, long previousElementTimestamp) { return 0; } }
Watermark類介紹@PublicEvolvingpublic final class Watermark extends StreamElement { /** The watermark that signifies end-of-event-time. */ public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE); // ------------------------------------------------------------------------ /** The timestamp of the watermark in milliseconds. */ private final long timestamp; /** * Creates a new watermark with the given timestamp in milliseconds. */ //建構函式,傳入時間戳 public Watermark(long timestamp) { this.timestamp = timestamp; } /** * Returns the timestamp associated with this {@link Watermark} in milliseconds. */ //獲取當前水印的時間戳大小 public long getTimestamp() { return timestamp; } // ------------------------------------------------------------------------ @Override public boolean equals(Object o) { return this == o || o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp; } @Override public int hashCode() { return (int) (timestamp ^ (timestamp >>> 32)); } @Override public String toString() { return "Watermark @ " + timestamp; }}
總結:
1、Watermark可以理解為一個帶著時間戳的空資料或者帶著時間戳的標誌資料,和其他資料一樣,一條一條的處理
2、Watermark只能一直遞增
3、Watermark計算方式為當前時間戳減去延遲時間 ,實現視窗延遲
4、window的執行由watermark觸發,watermark機制結合window實現
5、升序資料-AscendingTimestampExtractor
亂序資料-BoundedOutOfOrdernessTimestampExtractor
6、BoundedOutOfOrdernessTimestampExtractor比AscendingTimestampExtractor區別就在於,使用了一個最大的時間戳的值,
來對每個資料進行判斷,大於則更新,不大於則不更新。而AscendingTimestampExtractor後面的資料如果小於則會出現預警日誌
以上僅為個人學習時的理解,如果不確定,麻煩大佬指正!