首頁>技術>
Flink定時器ctx.timerService如何資料背景

一般是呼叫process(), 實現類繼承KeyedProcessFunction,在processElement方法裡對每條資料處理之後,根據需求註冊定時器

事件時間: onTimer()在Flink內部水印達到或超過Timer設定的時間戳時觸發。

ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);

系統處理時間 :onTimer()在系統時間戳達到Timer設定的時間戳時觸發。

  ctx.timerService().registerProcessingTimeTimer(value.getWindowEnd() + 1);
KeyedProcessFunction抽象類
@PublicEvolvingpublic abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {    private static final long serialVersionUID = 1L;    //每個元素處理邏輯    public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;    //觸發定時器邏輯    public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}        //上下文    public abstract class Context {        public abstract Long timestamp();        /**         * A {@link TimerService} for querying time and registering timers.         */        public abstract TimerService timerService();            public abstract <X> void output(OutputTag<X> outputTag, X value);        /**         * Get key of the element being processed.         */        public abstract K getCurrentKey();    }    /**     * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.     */    public abstract class OnTimerContext extends Context {        /**         * The {@link TimeDomain} of the firing timer.         */        public abstract TimeDomain timeDomain();        /**         * Get key of the firing timer.         */        @Override        public abstract K getCurrentKey();    }

KeyedProcessFunction的上下文實現:

可以上下文的實現類為:OnTimerContextImpl ContextImpl,OnTimerContext

透過檢視類關係可以:KeyedProcessFunction的上下文的實現類是KeyedProcessOperator類的ContextImpl

KeyedProcessOperator類

檢視ContextImpl的實現

    private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {        private final TimerService timerService;        private StreamRecord<IN> element;        ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {            function.super();            this.timerService = checkNotNull(timerService);        }        @Override        public Long timestamp() {            checkState(element != null);            if (element.hasTimestamp()) {                return element.getTimestamp();            } else {                return null;            }        }        @Override        public TimerService timerService() {            return timerService;        }        @Override        public <X> void output(OutputTag<X> outputTag, X value) {            if (outputTag == null) {                throw new IllegalArgumentException("OutputTag must not be null.");            }            output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));        }        @Override        @SuppressWarnings("unchecked")        public K getCurrentKey() {            return (K) KeyedProcessOperator.this.getCurrentKey();        }    }

跟蹤timerService方法,private final TimerService timerService; 是外部傳入的,

檢視ContextImp方法,timerService傳入

ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) {            function.super();            this.timerService = checkNotNull(timerService);        }

ContextImp方法由open方法呼叫

public void open() throws Exception {   super.open();   collector = new TimestampedCollector<>(output);   InternalTimerService<VoidNamespace> internalTimerService =         getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);   TimerService timerService = new SimpleTimerService(internalTimerService);   context = new ContextImpl(userFunction, timerService);   onTimerContext = new OnTimerContextImpl(userFunction, timerService);}

TimerService timerService = new SimpleTimerService(internalTimerService); 透過這句程式碼可推斷--->TimerService的實現類為SimpleTimerService

--->ctx.timerService() 返回的實現類為SimpleTimerService

檢視KeyedProcessOperator的類關係

SimpleTimerService類
@Internalpublic class SimpleTimerService implements TimerService {    private final InternalTimerService<VoidNamespace> internalTimerService;    public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) {        this.internalTimerService = internalTimerService;    }    @Override    public long currentProcessingTime() {        return internalTimerService.currentProcessingTime();    }    @Override    public long currentWatermark() {        return internalTimerService.currentWatermark();    }    @Override    public void registerProcessingTimeTimer(long time) {        internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);    }    @Override    public void registerEventTimeTimer(long time) {        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);    }    @Override    public void deleteProcessingTimeTimer(long time) {        internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);    }    @Override    public void deleteEventTimeTimer(long time) {        internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);    }}

繼續跟蹤:

@Override

public void registerEventTimeTimer(long time) {

internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);

}

internalTimerService由建構函式傳入

private final InternalTimerService<VoidNamespace> internalTimerService;
介面InternalTimerService

InternalTimerService只有一個實現類InternalTimerServiceImpl,在InternalTimerServiceImpl 這個類,實現了註冊定時器的邏輯

registerProcessingTimeTimer方法:

    @Override    public void registerProcessingTimeTimer(N namespace, long time) {        //定時器佇列取出第一個元素        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();        //如果當前物件加入佇列成功        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {            //檢視佇列中的第一個元素的時間,為null則為Long的最大時間,否則為資料時間戳            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;            // check if we need to re-schedule our timer to earlier            // 檢查是否需要將計時器重新安排到更早的時間,如果當前元素的時間比首個元素的時間小            if (time < nextTriggerTime) {                if (nextTimer != null) {                    //取消已經提交的任務                    nextTimer.cancel(false);                }                //定時器提交執行當前任務                nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);            }        }    }

add方法

將物件加入佇列,加入且取出的首個元素等於當前元素,則代表加入成功,否則返回false

@Nullable    @Override    public T peek() {        return heapOfKeyGroupedHeaps.peek().peek();    }    @Override    public boolean add(@Nonnull T toAdd) {        final PQ list = getKeyGroupSubHeapForElement(toAdd);        // the branch checks if the head element has (potentially) changed.        if (list.add(toAdd)) {            heapOfKeyGroupedHeaps.adjustModifiedElement(list);            // could we have a new head?            return toAdd.equals(peek());        } else {            // head unchanged            return false;        }    }

registerEventTimeTimer方法

當前物件新增到定時器執行佇列中

@Overridepublic void registerEventTimeTimer(N namespace, long time) {   eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));}

InternalTimerServiceImpl構造方法呼叫者InternalTimeServiceManager

InternalTimerServiceImpl(   KeyGroupRange localKeyGroupRange,   KeyContext keyContext,   ProcessingTimeService processingTimeService,   KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,   KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {

InternalTimeServiceManager用於管理各個InternalTimeService。

總結

1、註冊定時器原理是採用佇列+定時器實現

2、主要為系統時間定時器和事件時間定時處理器

3、系統時間的佇列會對時間進行判斷,時間小的先提交到定時器執行

4、如果時間為歷史時間,也會執行

以上僅為個人學習時的理解,如果不正確,麻煩大佬指正!

17
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 詳解PostgreSQL 12.2時間點恢復 (PITR)