一般是呼叫process(), 實現類繼承KeyedProcessFunction,在processElement方法裡對每條資料處理之後,根據需求註冊定時器
事件時間: onTimer()在Flink內部水印達到或超過Timer設定的時間戳時觸發。
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
系統處理時間 :onTimer()在系統時間戳達到Timer設定的時間戳時觸發。
ctx.timerService().registerProcessingTimeTimer(value.getWindowEnd() + 1);
KeyedProcessFunction抽象類
@PublicEvolvingpublic abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { private static final long serialVersionUID = 1L; //每個元素處理邏輯 public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception; //觸發定時器邏輯 public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {} //上下文 public abstract class Context { public abstract Long timestamp(); /** * A {@link TimerService} for querying time and registering timers. */ public abstract TimerService timerService(); public abstract <X> void output(OutputTag<X> outputTag, X value); /** * Get key of the element being processed. */ public abstract K getCurrentKey(); } /** * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. */ public abstract class OnTimerContext extends Context { /** * The {@link TimeDomain} of the firing timer. */ public abstract TimeDomain timeDomain(); /** * Get key of the firing timer. */ @Override public abstract K getCurrentKey(); }
KeyedProcessFunction的上下文實現:
可以上下文的實現類為:OnTimerContextImpl ContextImpl,OnTimerContext
透過檢視類關係可以:KeyedProcessFunction的上下文的實現類是KeyedProcessOperator類的ContextImpl
KeyedProcessOperator類檢視ContextImpl的實現
private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context { private final TimerService timerService; private StreamRecord<IN> element; ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) { function.super(); this.timerService = checkNotNull(timerService); } @Override public Long timestamp() { checkState(element != null); if (element.hasTimestamp()) { return element.getTimestamp(); } else { return null; } } @Override public TimerService timerService() { return timerService; } @Override public <X> void output(OutputTag<X> outputTag, X value) { if (outputTag == null) { throw new IllegalArgumentException("OutputTag must not be null."); } output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp())); } @Override @SuppressWarnings("unchecked") public K getCurrentKey() { return (K) KeyedProcessOperator.this.getCurrentKey(); } }
跟蹤timerService方法,private final TimerService timerService; 是外部傳入的,
檢視ContextImp方法,timerService傳入
ContextImpl(KeyedProcessFunction<K, IN, OUT> function, TimerService timerService) { function.super(); this.timerService = checkNotNull(timerService); }
ContextImp方法由open方法呼叫
public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); TimerService timerService = new SimpleTimerService(internalTimerService); context = new ContextImpl(userFunction, timerService); onTimerContext = new OnTimerContextImpl(userFunction, timerService);}
TimerService timerService = new SimpleTimerService(internalTimerService); 透過這句程式碼可推斷--->TimerService的實現類為SimpleTimerService
--->ctx.timerService() 返回的實現類為SimpleTimerService
檢視KeyedProcessOperator的類關係
SimpleTimerService類@Internalpublic class SimpleTimerService implements TimerService { private final InternalTimerService<VoidNamespace> internalTimerService; public SimpleTimerService(InternalTimerService<VoidNamespace> internalTimerService) { this.internalTimerService = internalTimerService; } @Override public long currentProcessingTime() { return internalTimerService.currentProcessingTime(); } @Override public long currentWatermark() { return internalTimerService.currentWatermark(); } @Override public void registerProcessingTimeTimer(long time) { internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void registerEventTimeTimer(long time) { internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void deleteProcessingTimeTimer(long time) { internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time); } @Override public void deleteEventTimeTimer(long time) { internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time); }}
繼續跟蹤:
@Override
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
}
internalTimerService由建構函式傳入
private final InternalTimerService<VoidNamespace> internalTimerService;
介面InternalTimerService
InternalTimerService只有一個實現類InternalTimerServiceImpl,在InternalTimerServiceImpl 這個類,實現了註冊定時器的邏輯
registerProcessingTimeTimer方法:
@Override public void registerProcessingTimeTimer(N namespace, long time) { //定時器佇列取出第一個元素 InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek(); //如果當前物件加入佇列成功 if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) { //檢視佇列中的第一個元素的時間,為null則為Long的最大時間,否則為資料時間戳 long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; // check if we need to re-schedule our timer to earlier // 檢查是否需要將計時器重新安排到更早的時間,如果當前元素的時間比首個元素的時間小 if (time < nextTriggerTime) { if (nextTimer != null) { //取消已經提交的任務 nextTimer.cancel(false); } //定時器提交執行當前任務 nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime); } } }
add方法:
將物件加入佇列,加入且取出的首個元素等於當前元素,則代表加入成功,否則返回false
@Nullable @Override public T peek() { return heapOfKeyGroupedHeaps.peek().peek(); } @Override public boolean add(@Nonnull T toAdd) { final PQ list = getKeyGroupSubHeapForElement(toAdd); // the branch checks if the head element has (potentially) changed. if (list.add(toAdd)) { heapOfKeyGroupedHeaps.adjustModifiedElement(list); // could we have a new head? return toAdd.equals(peek()); } else { // head unchanged return false; } }
registerEventTimeTimer方法:
當前物件新增到定時器執行佇列中
@Overridepublic void registerEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));}
InternalTimerServiceImpl構造方法呼叫者InternalTimeServiceManager
InternalTimerServiceImpl( KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue) {
InternalTimeServiceManager用於管理各個InternalTimeService。
總結
1、註冊定時器原理是採用佇列+定時器實現
2、主要為系統時間定時器和事件時間定時處理器
3、系統時間的佇列會對時間進行判斷,時間小的先提交到定時器執行
4、如果時間為歷史時間,也會執行
以上僅為個人學習時的理解,如果不正確,麻煩大佬指正!