一、時間輪介紹
之前公司內部搭建的延遲佇列服務有用到時間輪,但是一直沒有了解過它的實現原理。
最近有個和支付寶對接的專案,支付寶介面有流量控制,一定的時間內只允許 N 次介面呼叫,針對一些業務我們需要頻繁呼叫支付寶開放平臺介面,如果不對請求做限制,很容易觸發流控告警。
為了避免這個問題,我們按照一定延遲規則將任務載入進時間輪內,透過時間輪的排程來實現介面非同步呼叫。
很多開源框架都實現了時間輪演算法,這裡以 Netty 為例,看下 Netty 中時間輪是怎麼實現的。
1.1 快速入門下面是一個 API 使用例子。
public class WheelTimerSamples { private static final HashedWheelTimerInstance INSTANCE = HashedWheelTimerInstance.INSTANCE; public static void main(String[] args) throws IOException { INSTANCE.getWheelTimer().newTimeout(new PrintTimerTask(), 3, TimeUnit.SECONDS); System.in.read(); } static class PrintTimerTask implements TimerTask { @Override public void run(Timeout timeout) { System.out.println("Hello world"); } } enum HashedWheelTimerInstance { INSTANCE; private final HashedWheelTimer wheelTimer; HashedWheelTimerInstance() { wheelTimer = new HashedWheelTimer(r -> { Thread t = new Thread(r); t.setUncaughtExceptionHandler((t1, e) -> System.out.println(t1.getName() + e.getMessage())); t.setName("-HashedTimerWheelInstance-"); return t; }, 100, TimeUnit.MILLISECONDS, 64); } public HashedWheelTimer getWheelTimer() { return wheelTimer; } }}複製程式碼
上面的例子中我們自定義了一個 HashedWheelTimer,然後自定義了一個 TimerTask,將一個任務載入進時間輪,3s 後執行這個任務,怎麼樣是不是很簡單。
在定義時間輪時建議按照業務型別進行區分,將時間輪定義為多個單例物件。
1.2 原理圖解二、原理分析2.1 時間輪狀態時間輪有以下三種狀態:
WORKER_STATE_INIT:初始化狀態,此時時間輪內的工作執行緒還沒有開啟WORKER_STATE_STARTED:執行狀態,時間輪內的工作執行緒已經開啟WORKER_STATE_SHUTDOWN:終止狀態,時間輪停止工作狀態轉換如下,轉換原理會在下面講到:
2.2 建構函式 public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // 初始化時間輪陣列,時間輪大小為大於等於 ticksPerWheel 的第一個 2 的冪,和 HashMap 類似 wheel = createWheel(ticksPerWheel); // 取模用,用來定位陣列中的槽 mask = wheel.length - 1; // 為了保證精度,時間輪內的時間單位為納秒 long duration = unit.toNanos(tickDuration); // 時間輪內的時鐘撥動頻率不宜太大也不宜太小 if (duration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } if (duration < MILLISECOND_NANOS) { logger.warn("Configured tickDuration {} smaller then {}, using 1ms.", tickDuration, MILLISECOND_NANOS); this.tickDuration = MILLISECOND_NANOS; } else { this.tickDuration = duration; } // 建立工作執行緒 workerThread = threadFactory.newThread(worker); // 非守護執行緒且 leakDetection 為 true 時檢測記憶體是否洩漏 leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // 初始化最大等待任務數 this.maxPendingTimeouts = maxPendingTimeouts; // 如果建立的時間輪例項大於 64,列印日誌,並且這個日誌只會列印一次 if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }複製程式碼
建構函式中的引數相當重要,當自定義時間輪時,我們應該根據業務的範圍設定合理的引數:
threadFactory:建立時間輪任務執行緒的工廠,透過這個工廠可以給我們的執行緒自定義一些屬性(執行緒名、異常處理等)tickDuration:時鐘多長時間撥動一次,值越小,時間輪精度越高unit:tickDuration 的單位ticksPerWheel:時間輪陣列大小leakDetection:是否檢測記憶體洩漏maxPendingTimeouts:時間輪內最大等待的任務數時間輪的時鐘撥動時長應該根據業務設定恰當的值,如果設定的過大,可能導致任務觸發時間不準確。如果設定的過小,時間輪轉動頻繁,任務少的情況下載入不到任務,屬於一直空轉的狀態,會佔用 CPU 執行緒資源。
為了防止時間輪佔用過多的 CPU 資源,當建立的時間輪物件大於 64 時會以日誌的方式提示。
建構函式中只是初始化了輪執行緒,並沒有開啟,當第一次往時間輪內新增任務時,執行緒才會開啟。
2.3 往時間輪內新增任務 @Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } // 等待的任務數 +1 long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); // 如果時間輪內等待的任務數大於最大值,任務會被拋棄 if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 開啟時間輪內的執行緒 start(); // 計算當前新增任務的執行時間 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 將任務加入佇列 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }複製程式碼
任務會先儲存在佇列中,當時間輪的時鐘撥動時才會判斷是否將佇列中的任務載入進時間輪。
public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: // 這裡存在併發,透過 CAS 操作保證最終只有一個執行緒能開啟時間輪的工作執行緒 if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } while (startTime == 0) { try { // startTimeInitialized 是一個 CountDownLatch,目的是為了保證工作執行緒的 startTime 屬性初始化 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }複製程式碼
這裡透過 CAS 加鎖的方式保證執行緒安全,避免多次開啟。
工作執行緒開啟後,start() 方法會被阻塞,等工作執行緒的 startTime 屬性初始化完成後才被喚醒。為什麼只有等 startTime 初始化後才能繼續執行呢?因為上面的 newTimeout 方法線上程開啟後,需要計算當前新增進來任務的執行時間,而這個執行時間是根據 startTime 計算的。
2.4 時間輪排程 @Override public void run() { // 初始化 startTime. startTime = System.nanoTime(); if (startTime == 0) { startTime = 1; } // 用來喚醒被阻塞的 HashedWheelTimer#start() 方法,保證 startTime 初始化 startTimeInitialized.countDown(); do { // 時鐘撥動 final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); // 處理過期的任務 processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // 將任務載入進時間輪 transferTimeoutsToBuckets(); // 執行當前時間輪槽內的任務 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // 時間輪關閉,將還未執行的任務以列表的形式儲存到 unprocessedTimeouts 集合中,在 stop 方法中返回出去 // 還未執行的任務可能會在兩個地方,一:時間輪陣列內,二:佇列中 for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } // 處理過期的任務 processCancelledTasks(); }複製程式碼
時間輪每撥動一次 tick 就會 +1,根據這個值與(時間輪陣列長度 - 1)進行 & 運算,可以定位時間輪陣列內的槽。因為 tick 值一直在增加,所以時間輪陣列看起來就像一個不斷迴圈的圓。
先初始化 startTime 值,因為後面任務執行的時間是根據 startTime 計算的時鐘撥動,如果時間未到,則 sleep 一會兒處理過期的任務將任務載入進時間輪執行當前時鐘對應時間輪內的任務時間輪關閉,將所有未執行的任務封裝到 unprocessedTimeouts 集合中,在 stop 方法中返回出去處理過期的任務上面簡單羅列了下 run 方法的大概執行步驟,下面是具體方法的分析。
2.5 時鐘撥動如果時間輪設定的 tickDuration 為 100ms 撥動一次,當時鍾撥動一次後,應該計算下一次時鐘撥動的時間,如果還沒到就 sleep 一會兒,等到撥動時間再醒來。
private long waitForNextTick() { // 計算時鐘下次撥動的相對時間 long deadline = tickDuration * (tick + 1); for (;;) { // 獲取當前時間的相對時間 final long currentTime = System.nanoTime() - startTime; // 計算距離時鐘下次撥動的時間 // 這裡之所以加 999999 後再除 10000000, 是為了保證足夠的 sleep 時間 // 例如:當 deadline - currentTime = 2000002 的時候,如果不加 999999,則只睡了 2ms // 而 2ms 其實是未到達 deadline 時間點的,所以為了使上述情況能 sleep 足夠的時間,加上 999999 後,會多睡 1ms long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; // <=0 說明可以撥動時鐘了 if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } // 這裡是為了相容 Windows 平臺,因為 Windows 平臺的排程最小單位為 10ms,如果不是 10ms 的倍數,可能會引起 sleep 時間不準確 // See https://github.com/Netty/Netty/issues/356 if (PlatformDependent.isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { // sleep 到下次時鐘撥動 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } }複製程式碼
如果時間不到就 sleep 等待一會兒,為了使任務時鐘準確,可以從上面的程式碼中看出 Netty 做了一些最佳化,比如 sleepTimeMs 的計算,Windows 平臺的處理等。
2.6 將任務從佇列載入進時間輪 private void transferTimeoutsToBuckets() { // 一次最多隻處理佇列中的 100000 個任務 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } // 過濾已經取消的任務 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { continue; } // 計算當前任務到執行還需要經過幾次時鐘撥動 // 假設時間輪陣列大小是 10,calculated 為 12,需要時間輪轉動一圈加兩次時鐘撥動後後才能執行這個任務,因此還需要計算一下圈數 long calculated = timeout.deadline / tickDuration; // 計算當前任務到執行還需要經過幾圈時鐘撥動 timeout.remainingRounds = (calculated - tick) / wheel.length; // 有的任務可能在佇列裡很長時間,時間過期了也沒有被排程,將這種情況的任務放在當前輪次內執行 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. // 計算任務在時間輪陣列中的槽 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; // 將任務放到時間輪的陣列中,多個任務可能定位時間輪的同一個槽,這些任務透過以連結串列的形式連結 bucket.addTimeout(timeout); } } void addTimeout(HashedWheelTimeout timeout) { assert timeout.bucket == null; // 任務構成雙向連結串列 timeout.bucket = this; if (head == null) { head = tail = timeout; } else { tail.next = timeout; timeout.prev = tail; tail = timeout; } } 複製程式碼
在上面也提到過,任務剛加進來不會立即到時間輪中去,而是暫時儲存到一個佇列中,當時間輪時鐘撥動時,會將任務從佇列中載入進時間輪內。
時間輪每次最大處理 100000 個任務,因為任務的執行時間是使用者自定義的,所以需要計算任務到執行需要經過多少次時鐘撥動,並計算時間輪撥動的圈數。接著將任務載入進時間輪對應的槽內,可能有多個任務經過 hash 計算後定位到同一個槽,這些任務會以雙向連結串列的結構儲存,有點類似 HashMap 處理碰撞的情況。
2.7 執行任務 public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; while (timeout != null) { HashedWheelTimeout next = timeout.next; // 任務執行的圈數 > 0,表示任務還需要經過 remainingRounds 圈時鐘迴圈才能執行 if (timeout.remainingRounds <= 0) { // 從連結串列中移除當前任務,並返回連結串列中下一個任務 next = remove(timeout); if (timeout.deadline <= deadline) { // 執行任務 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { // 過濾取消的任務 next = remove(timeout); } else { // 圈數 -1 timeout.remainingRounds --; } timeout = next; } } public void expire() { // 任務狀態校驗 if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } }複製程式碼
時間輪槽內的任務以連結串列形式儲存,這些任務執行的時間可能會不一樣,有的在當前時鐘執行,有的在下一圈或者下兩圈對應的時鐘執行。當任務在當前時鐘執行時,需要將這個任務從連結串列中刪除,重新維護連結串列關係。
2.8 終止時間輪 @Override public Set<Timeout> stop() { // 終止時間輪的執行緒不能是時間輪的工作執行緒 if (Thread.currentThread() == workerThread) { throw new IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } // 將時間輪的狀態修改為 WORKER_STATE_SHUTDOWN,這裡有兩種情況 // 一:時間輪是 WORKER_STATE_INIT 狀態,表明時間輪從建立到終止一直沒有任務進來 // 二:時間輪是 WORKER_STATE_STARTED 狀態,多個執行緒嘗試終止時間輪,只有一個操作成功 if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // 程式碼走到這裡,時間輪只能是兩種狀態中的一個,WORKER_STATE_INIT 和 WORKER_STATE_SHUTDOWN // 為 WORKER_STATE_INIT 表示時間輪沒有任務,因此不用返回未處理的任務,但是需要將時間輪例項 -1 // 為 WORKER_STATE_SHUTDOWN 表示是 CAS 操作失敗,什麼都不用做,因為 CAS 成功的執行緒會處理 if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { // 時間輪例項物件 -1 INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } // CAS 操作失敗,或者時間輪沒有處理過任務,返回空的任務列表 return Collections.emptySet(); } try { boolean interrupted = false; while (workerThread.isAlive()) { // 中斷時間輪工作執行緒 workerThread.interrupt(); try { // 終止時間輪的執行緒等待時間輪工作執行緒 100ms,這個過程主要是為了時間輪工作執行緒處理未執行的任務 workerThread.join(100); } catch (InterruptedException ignored) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } } finally { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } } // 返回未處理的任務 return worker.unprocessedTimeouts(); }複製程式碼
當終止時間輪時,時間輪狀態有兩種情況:
WORKER_STATE_INIT:時間輪初始化,前面我們說過,當初始化時間輪物件時並不會立即開啟時間輪工作執行緒,而是第一次新增任務時才開啟,為 WORKER_STATE_INIT 表示時間輪沒有處理過任務WORKER_STATE_STARTED:時間輪在工作,這裡也有兩種情況,存在併發與不存在併發,如果多個執行緒都嘗試終止時間輪,肯定只能有一個成功時間輪停止執行後會將未執行的任務返回出去,至於怎麼處理這些任務,由業務方自己定義,這個流程和執行緒池的 shutdownNow 方法是類似的。
如果時間輪在執行,怎麼才能獲取到未執行的任務呢,答案就在上面的 run() 方法中,如果時間輪處於非執行狀態,會把時間輪陣列與佇列中未執行且未取消的任務儲存到 unprocessedTimeouts 集合中。而終止時間輪成功的執行緒只需要等待一會兒即可,這個等待是透過 workerThread.join(100); 實現的。
取消時間輪內的任務相對比較簡單,這裡就不概述了,想要了解的自行檢視即可。
上面就是時間輪執行的基本原理了。
三、總結這裡以問答的形式進行總結,大家也可以看下這些問題,自己能不能很好的回答出來?
3.1 時間輪是不是在初始化完成後就啟動了?不是,初始化完成時間輪的狀態是 WORKER_STATE_INIT,此時時間輪內的工作執行緒還沒有執行,只有第一次往時間輪內新增任務時,才會開啟時間輪內的工作執行緒。時間輪執行緒開啟後會初始化 startTime,任務的執行時間會根據這個欄位計算,而且時間輪中時間的概念是相對的。
3.2 如果時間輪內還有任務未執行,服務重啟了怎麼辦?時間輪內的任務都在記憶體中,服務重啟資料肯定都丟了,所以當服務重啟時需要業務方自己做相容處理。
3.3 如何自定義合適的時間輪引數?自定義時間輪時有兩個比較重要的引數需要我們注意:
tickDuration:時鐘撥動頻率,假設一個任務在 10s 後執行,tickDuration 設定為 3min 那肯定是不行的,tickDuration 值越小,任務觸發的精度越高,但是沒有任務時,工作執行緒會一直自旋嘗試從佇列中拿任務,比較消耗 CPU 資源ticksPerWheel:時間輪陣列大小,假設當時間輪時鐘撥動時,有 10000 個任務處理,但是我們定義時間輪陣列的大小為 8,這時平均一個時間輪槽內有 1250 個任務,如果這 1250 個任務都在當前時鐘執行,任務執行是同步的,由於每個任務執行都會消耗時間,可能會導致後面的任務觸發時間不準確。反之如果陣列長度設定的過大,任務比較少的情況下,時間輪陣列很多槽都是空的所以當使用自定義時間輪時,一定要評估自己的業務後再設定引數。
3.4 Netty 的時間輪有什麼缺陷?Netty 中的時間輪是透過單執行緒實現的,如果在執行任務的過程中出現阻塞,會影響後面任務執行。除此之外,Netty 中的時間輪並不適合建立延遲時間跨度很大的任務,比如往時間輪內丟成百上千個任務並設定 10 天后執行,這樣可能會導致連結串列過長 round 值很大,而且這些任務在執行之前會一直佔用記憶體。
3.5 時間輪要設定成單例的嗎?強烈建議按照業務模組區分,每個模組都建立一個單例的時間輪物件。在上面的程式碼中我們看到了,當時間輪物件大於 64 時會以日誌的形式提示。如果時間輪是非單例物件,那時間輪演算法完全就失去了作用。
3.6 時間輪迴 ScheduledExecutorService 的區別?ScheduledExecutorService 中的任務維護了一個堆,當有大量任務時,需要調整堆結構導致效能下降,而時間輪透過時鐘排程,可以不受任務量的限制。
當任務量比較少時時間輪會一直自旋空轉撥動時鐘,相比 ScheduledExecutorService 會佔用一定 CPU 資源。