首頁>技術>

flink設定watermark以及事件時間欄位原始碼分析背景

1.1、提取時間戳欄位,用於事件事件語義處理資料

1.2、設定水位線(水印)watermark

TimestampAssigner 核心介面介紹
TimestampAssigner 時間分配器介面 實現類關係圖:

提取時間戳欄位方法

TimestampAssigner 時間戳分配器, 提取資料流中的時間戳欄位,

AssignerWithPeriodicWatermarks  //週期性的生成水印AssignerWithPunctuatedWatermarks //打斷式的生成,也就是可以每一條資料都生成BoundedOutOfOrdernessTimestampExtractor //亂序資料週期性生成AscendingTimestampExtractor //升序資料週期性生成IngestionTimeExtractor  //進入flink系統時間分配器

TimestampAssigner 實現類

AssignerWithPeriodicWatermarks  //週期性的生成水印AssignerWithPunctuatedWatermarks //打斷式的生成,也就是可以每一條資料都生成BoundedOutOfOrdernessTimestampExtractor //亂序資料週期性生成AscendingTimestampExtractor //升序資料週期性生成IngestionTimeExtractor  //進入flink系統時間分配器
設定時間戳、水印方法

DataStream類設定時間戳的方法:assignTimestampsAndWatermarks,指定watermark

    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {        // match parallelism to input, otherwise dop=1 sources could lead to some strange        // behaviour: the watermark will creep along very slowly because the elements        // from the source go to each extraction operator round robin.        final int inputParallelism = getTransformation().getParallelism();        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);        TimestampsAndPeriodicWatermarksOperator<T> operator =                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)                .setParallelism(inputParallelism);    }
1、AssignerWithPeriodicWatermarks介面:
public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> {    @Nullable    Watermark getCurrentWatermark();}    

官方實現類:

BoundedOutOfOrdernessTimestampExtractor //亂序資料週期性生成 AscendingTimestampExtractor //升序資料週期性生成 IngestionTimeExtractor //進入flink系統時間分配器

因此我們一般選擇使用實現類即可

2、AscendingTimestampExtractor 週期性生成watermark,升序資料

//實現類提取時間戳欄位方法,呼叫者實現

  public abstract long extractAscendingTimestamp(T element);

//根據資料流時間戳,計算watermark的時間戳 --升序處理資料

@Override    public final long extractTimestamp(T element, long elementPrevTimestamp) {    //資料流中獲取的時間戳        final long newTimestamp = extractAscendingTimestamp(element);        //如果當前資料的時俱戳大於當前已知的時間戳中的,則更新watermark中的時間戳        if (newTimestamp >= this.currentTimestamp) {            this.currentTimestamp = newTimestamp;            return newTimestamp;        } else {        //否則列印日誌            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);            return newTimestamp;        }    }

列印日誌處理方法:

public static final class LoggingHandler implements MonotonyViolationHandler {        private static final long serialVersionUID = 1L;        private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);        @Override        public void handleViolation(long elementTimestamp, long lastTimestamp) {            LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);        }    }

獲取當前watermark的方法

    @Override    public final Watermark getCurrentWatermark() {        //預設延遲1毫秒        //如果當前時間戳等於Long.MIN_VALUE 則返回Long.MIN_VALUE,否則返回最大時間戳-1        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);    }

AscendingTimestampExtractor 繼承 AscendingTimestampExtractor

@PublicEvolving@Deprecatedpublic abstract class AscendingTimestampExtractor<T>    extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {}
3、BoundedOutOfOrdernessTimestampExtractor 週期性的亂序資料

1、在建立物件時,預設給了一個最大的時間戳, Long.MIN_VALUE + this.maxOutOfOrderness;

2、來一條資料,判斷當前時間戳和最大時間戳的大小,如果當前時間戳大於最大時間戳,則更新

3、生成watermark,用最大時間戳減去最大延遲,也就是watermark中的時間戳調慢的時間,比如原本是3點結束的視窗,延遲為1分鐘,那麼watermark中的時間應該為2分59秒

4、預設是 Long.MIN_VALUE是防止出現最大的時間戳減去最大延遲為負數,watermark中的時間戳為負數,出現時間倒轉

BoundedOutOfOrdernessTimestampExtractor 有參建構函式:

    /** The current maximum timestamp seen so far. */    private long currentMaxTimestamp; //截至目前最大的時間戳    /** The timestamp of the last emitted watermark. */    private long lastEmittedWatermark = Long.MIN_VALUE; //上次watermark中時間戳    /**     * The (fixed) interval between the maximum seen timestamp seen in the records     * and that of the watermark to be emitted.     */    private final long maxOutOfOrderness; //最大延遲時間    //建構函式  maxOutOfOrderness為亂序可容忍的最大程度,單位可以為milliseconds  seconds minutes等等    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {        //如果延遲時間小於0,丟擲異常        if (maxOutOfOrderness.toMilliseconds() < 0) {            throw new RuntimeException("Tried to set the maximum allowed " +                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");        }        //最大延遲轉換為毫秒數        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();        //計算最大的預設的時間戳 防止資料溢位,這裡要要加上最大延遲        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;    }

實現類重寫提取時間戳欄位的方法: 呼叫者使用,提取出指定欄位的資料並返回當前時間戳的大小

public abstract long extractTimestamp(T element);

extractTimestamp過載方法,用於更新最大的時間戳,每來一條資料進行判斷

@Override    public final long extractTimestamp(T element, long previousElementTimestamp) {        //獲取當前資料的時間戳大小        long timestamp = extractTimestamp(element);        //如果當前資料的時間戳大小大於目前最大的時間戳,則賦值        if (timestamp > currentMaxTimestamp) {            currentMaxTimestamp = timestamp;        }        //如果當前資料的時間戳小於目前最大的時間戳,則不變        return timestamp;    }

獲取watermark中的時間戳:

@Overridepublic final Watermark getCurrentWatermark() {    // this guarantees that the watermark never goes backwards.    //當前時間的最大時間戳 - 最大延遲時間 =watermark中的時間戳    long potentialWM = currentMaxTimestamp - maxOutOfOrderness;    // 如果當前的最大時間戳延遲後的時間戳大於上次的watermark中的時間戳,則更新watermark    if (potentialWM >= lastEmittedWatermark) {        lastEmittedWatermark = potentialWM;    }    return new Watermark(lastEmittedWatermark);}
4、介面AssignerWithPunctuatedWatermarks

每一條資料都生成watermark的介面

    public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> {    @Nullable    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);}

沒有實現類,需我們自己實現

 public static class Test implements AssignerWithPunctuatedWatermarks {        /**         * 生成Watermark         *         * @param lastElement        上一條資料         * @param extractedTimestamp 水印的時間戳         * @return         */        @Nullable        @Override        public Watermark checkAndGetNextWatermark(Object lastElement, long extractedTimestamp) {            return null;        }        //提取時間戳欄位        @Override        public long extractTimestamp(Object element, long previousElementTimestamp) {            return 0;        }    }
Watermark類介紹
@PublicEvolvingpublic final class Watermark extends StreamElement {    /** The watermark that signifies end-of-event-time. */    public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);    // ------------------------------------------------------------------------    /** The timestamp of the watermark in milliseconds. */    private final long timestamp;    /**     * Creates a new watermark with the given timestamp in milliseconds.     */    //建構函式,傳入時間戳    public Watermark(long timestamp) {        this.timestamp = timestamp;    }    /**     * Returns the timestamp associated with this {@link Watermark} in milliseconds.     */    //獲取當前水印的時間戳大小    public long getTimestamp() {        return timestamp;    }    // ------------------------------------------------------------------------    @Override    public boolean equals(Object o) {        return this == o ||                o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp == this.timestamp;    }    @Override    public int hashCode() {        return (int) (timestamp ^ (timestamp >>> 32));    }    @Override    public String toString() {        return "Watermark @ " + timestamp;    }}

總結

1、Watermark可以理解為一個帶著時間戳的空資料或者帶著時間戳的標誌資料,和其他資料一樣,一條一條的處理

2、Watermark只能一直遞增

3、Watermark計算方式為當前時間戳減去延遲時間 ,實現視窗延遲

4、window的執行由watermark觸發,watermark機制結合window實現

5、升序資料-AscendingTimestampExtractor

亂序資料-BoundedOutOfOrdernessTimestampExtractor

6、BoundedOutOfOrdernessTimestampExtractor比AscendingTimestampExtractor區別就在於,使用了一個最大的時間戳的值,

來對每個資料進行判斷,大於則更新,不大於則不更新。而AscendingTimestampExtractor後面的資料如果小於則會出現預警日誌

以上僅為個人學習時的理解,如果不確定,麻煩大佬指正!

14
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • SAP許可權入門知識