目錄CountDownLatch 簡介CountDownLatch 使用示例CountDownLatch 優缺點CountDownLatch 原始碼解析擴充套件CountDownLatch 簡介
CountDownLatch 是一個同步工具類,用來協調多個執行緒之間的同步,達到執行緒間通訊的效果。CountDownLatch 允許一個或多個執行緒一直等待,直到其他執行緒執行完成後再執行。
使用示例讓多個執行緒等待public class Test { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(2); for (int i = 0; i < 5; i++) { new Thread(() -> { try { System.out.println("運動員["+Thread.currentThread().getName()+"]準備完畢,等待號令..."); countDownLatch.await(); System.out.println("運動員["+Thread.currentThread().getName()+"]開跑..."); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } System.out.println("裁判準備發令..."); Thread.sleep(2000); countDownLatch.countDown(); System.out.println("執行發令..."); countDownLatch.countDown(); }}
輸出結果:
裁判準備發令...運動員[Thread-0]準備完畢,等待號令...運動員[Thread-1]準備完畢,等待號令...運動員[Thread-2]準備完畢,等待號令...運動員[Thread-3]準備完畢,等待號令...運動員[Thread-4]準備完畢,等待號令...執行發令...運動員[Thread-0]開跑...運動員[Thread-2]開跑...運動員[Thread-4]開跑...運動員[Thread-3]開跑...運動員[Thread-1]開跑...
讓一個執行緒等待
public class Test { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { new Thread(() -> { System.out.println("運動員["+Thread.currentThread().getName()+"]到達終點..."); countDownLatch.countDown(); }).start(); } countDownLatch.await(); System.out.println("工作人員開始統計排名..."); }}
輸出結果:
運動員[Thread-0]到達終點...運動員[Thread-1]到達終點...運動員[Thread-2]到達終點...運動員[Thread-3]到達終點...運動員[Thread-4]到達終點...工作人員開始統計排名...
優缺點
CountDownLatch 是一次性的,計算器的值只能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當 CountDownLatch 使用完畢後,它不能再次被使用。
原始碼解析CountDownLatch 是基於 AQS 同步器的共享鎖實現的內部邏輯,透過程式碼可以看到,CountDownLatch 類主要的幾個方法都是透過 Sync 類實現的,而 Sync 類就是 AQS 的實現類。
public class CountDownLatch { // AQS 同步器的實現類 private final Sync sync; // 構造方法,初始化計數器 public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } // 阻塞加共享鎖,可響應中斷 public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } // 阻塞加共享鎖,可響應中斷,並設定超時時間 public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } // 計數器減 1,呼叫釋放共享鎖方法 public void countDown() { sync.releaseShared(1); }}
Sync 繼承了 AQS 並實現了共享鎖相關的兩個方法 tryAcquireShared 及 tryReleaseShared
private static final class Sync extends AbstractQueuedSynchronizer { // 初始化同步狀態值 Sync(int count) { setState(count); } // 加共享鎖 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 釋放共享鎖 protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }}
大體結構基本清楚了,現在開始理流程。首先需要建立 CountDownLatch 物件,並設定一個初始計數器值,最終是呼叫了 AQS 的 setState 方法將該值設定到了 AQS 的 state 同步狀態值上。
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count);}Sync(int count) { setState(count);}
然後主執行緒會執行 await() 方法進行等待,這個方法最終呼叫了 AQS 的 acquireSharedInterruptibly 方法執行加共享鎖,acquireSharedInterruptibly 方法首先會呼叫 tryAcquireShared 方法嘗試加鎖,如果嘗試失敗,則進入等待佇列並掛起執行緒。因為上一步初始化了 state,很明顯這裡 state != 0,所以加鎖失敗,執行緒進入等待佇列。
// 阻塞加共享鎖,可響應中斷public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);}// 加共享鎖public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg);}// 加共享鎖protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;}
接著子執行緒會執行 countDown() 方法,這個方法呼叫 AQS 的 releaseShared 方法釋放共享鎖,其實就是讓 AQS 的同步狀態減 1,當同步狀態值減到 0 時,需要喚醒上一步在等待佇列中掛起的執行緒,此時這些執行緒就可以加鎖成功,從而執行後續的操作。
// 計數器減 1,呼叫釋放共享鎖方法public void countDown() { sync.releaseShared(1);}// 釋放共享鎖public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { // 當同步狀態減至 0 時,喚醒等待佇列中的執行緒 doReleaseShared(); return true; } return false;}// 釋放共享鎖protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) // 透過 cas 讓同步狀態減 1 return nextc == 0; // 如果減 1 後的狀態為 0 返回 true }}
擴充套件CountDownLatch 與 Thread.join
CountDownLatch 的作用就是允許一個或多個執行緒等待其他執行緒完成操作,看起來有點類似 join() 方法。CountDownLatch 可以手動控制在 n 個執行緒裡呼叫 n 次 countDown() 方法使計數器進行減一操作,也可以在一個執行緒裡呼叫 n 次執行減一操作。而 join() 的實現原理是不停檢查 join 執行緒是否存活,如果 join 執行緒存活則讓當前執行緒永遠等待。所以 CountDownLatch 使用起來較為靈活。
CountDownLatch 與 CyclicBarrier
兩者都能夠實現執行緒之間的等待,只不過它們側重點不同:
CountDownLatch 一般用於一個或多個執行緒,等待其他執行緒執行完任務後,再才執行。CyclicBarrier 一般用於一組執行緒互相等待至某個狀態,然後這一組執行緒再同時執行。CountDownLatch是減計數,計數減為0後不能重用;而CyclicBarrier是加計數,可置0後複用。CyclicBarrier 的原始碼分析請看下篇