首頁>技術>

目錄CountDownLatch 簡介CountDownLatch 使用示例CountDownLatch 優缺點CountDownLatch 原始碼解析擴充套件CountDownLatch 簡介

CountDownLatch 是一個同步工具類,用來協調多個執行緒之間的同步,達到執行緒間通訊的效果。CountDownLatch 允許一個或多個執行緒一直等待,直到其他執行緒執行完成後再執行。

使用示例讓多個執行緒等待
public class Test {    public static void main(String[] args) throws InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(2);        for (int i = 0; i < 5; i++) {            new Thread(() -> {                try {                    System.out.println("運動員["+Thread.currentThread().getName()+"]準備完畢,等待號令...");                    countDownLatch.await();                    System.out.println("運動員["+Thread.currentThread().getName()+"]開跑...");                } catch (InterruptedException e) {                    e.printStackTrace();                }            }).start();        }        System.out.println("裁判準備發令...");        Thread.sleep(2000);        countDownLatch.countDown();        System.out.println("執行發令...");        countDownLatch.countDown();    }}

輸出結果:

裁判準備發令...運動員[Thread-0]準備完畢,等待號令...運動員[Thread-1]準備完畢,等待號令...運動員[Thread-2]準備完畢,等待號令...運動員[Thread-3]準備完畢,等待號令...運動員[Thread-4]準備完畢,等待號令...執行發令...運動員[Thread-0]開跑...運動員[Thread-2]開跑...運動員[Thread-4]開跑...運動員[Thread-3]開跑...運動員[Thread-1]開跑...
讓一個執行緒等待
public class Test {    public static void main(String[] args) throws InterruptedException {        CountDownLatch countDownLatch = new CountDownLatch(5);        for (int i = 0; i < 5; i++) {            new Thread(() -> {                System.out.println("運動員["+Thread.currentThread().getName()+"]到達終點...");                countDownLatch.countDown();            }).start();        }        countDownLatch.await();        System.out.println("工作人員開始統計排名...");    }}

輸出結果:

運動員[Thread-0]到達終點...運動員[Thread-1]到達終點...運動員[Thread-2]到達終點...運動員[Thread-3]到達終點...運動員[Thread-4]到達終點...工作人員開始統計排名...
優缺點

CountDownLatch 是一次性的,計算器的值只能在構造方法中初始化一次,之後沒有任何機制再次對其設定值,當 CountDownLatch 使用完畢後,它不能再次被使用。

原始碼解析

CountDownLatch 是基於 AQS 同步器的共享鎖實現的內部邏輯,透過程式碼可以看到,CountDownLatch 類主要的幾個方法都是透過 Sync 類實現的,而 Sync 類就是 AQS 的實現類。

public class CountDownLatch {    // AQS 同步器的實現類    private final Sync sync;    // 構造方法,初始化計數器    public CountDownLatch(int count) {        if (count < 0) throw new IllegalArgumentException("count < 0");        this.sync = new Sync(count);    }    // 阻塞加共享鎖,可響應中斷    public void await() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }    // 阻塞加共享鎖,可響應中斷,並設定超時時間    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));    }    // 計數器減 1,呼叫釋放共享鎖方法    public void countDown() {        sync.releaseShared(1);    }}

Sync 繼承了 AQS 並實現了共享鎖相關的兩個方法 tryAcquireShared 及 tryReleaseShared

private static final class Sync extends AbstractQueuedSynchronizer {    // 初始化同步狀態值    Sync(int count) {        setState(count);    }    // 加共享鎖    protected int tryAcquireShared(int acquires) {        return (getState() == 0) ? 1 : -1;    }    // 釋放共享鎖    protected boolean tryReleaseShared(int releases) {        for (;;) {            int c = getState();            if (c == 0)                return false;            int nextc = c-1;            if (compareAndSetState(c, nextc))                return nextc == 0;        }    }}

大體結構基本清楚了,現在開始理流程。首先需要建立 CountDownLatch 物件,並設定一個初始計數器值,最終是呼叫了 AQS 的 setState 方法將該值設定到了 AQS 的 state 同步狀態值上。

public CountDownLatch(int count) {    if (count < 0) throw new IllegalArgumentException("count < 0");    this.sync = new Sync(count);}Sync(int count) {    setState(count);}

然後主執行緒會執行 await() 方法進行等待,這個方法最終呼叫了 AQS 的 acquireSharedInterruptibly 方法執行加共享鎖,acquireSharedInterruptibly 方法首先會呼叫 tryAcquireShared 方法嘗試加鎖,如果嘗試失敗,則進入等待佇列並掛起執行緒。因為上一步初始化了 state,很明顯這裡 state != 0,所以加鎖失敗,執行緒進入等待佇列。

// 阻塞加共享鎖,可響應中斷public void await() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}// 加共享鎖public final void acquireSharedInterruptibly(int arg) throws InterruptedException {    if (Thread.interrupted())        throw new InterruptedException();    if (tryAcquireShared(arg) < 0)        doAcquireSharedInterruptibly(arg);}// 加共享鎖protected int tryAcquireShared(int acquires) {    return (getState() == 0) ? 1 : -1;}

接著子執行緒會執行 countDown() 方法,這個方法呼叫 AQS 的 releaseShared 方法釋放共享鎖,其實就是讓 AQS 的同步狀態減 1,當同步狀態值減到 0 時,需要喚醒上一步在等待佇列中掛起的執行緒,此時這些執行緒就可以加鎖成功,從而執行後續的操作。

// 計數器減 1,呼叫釋放共享鎖方法public void countDown() {    sync.releaseShared(1);}// 釋放共享鎖public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) { // 當同步狀態減至 0 時,喚醒等待佇列中的執行緒        doReleaseShared();        return true;    }    return false;}// 釋放共享鎖protected boolean tryReleaseShared(int releases) {    for (;;) {        int c = getState();        if (c == 0)            return false;        int nextc = c-1;        if (compareAndSetState(c, nextc)) // 透過 cas 讓同步狀態減 1            return nextc == 0; // 如果減 1 後的狀態為 0 返回 true    }}
擴充套件

CountDownLatch 與 Thread.join

CountDownLatch 的作用就是允許一個或多個執行緒等待其他執行緒完成操作,看起來有點類似 join() 方法。CountDownLatch 可以手動控制在 n 個執行緒裡呼叫 n 次 countDown() 方法使計數器進行減一操作,也可以在一個執行緒裡呼叫 n 次執行減一操作。而 join() 的實現原理是不停檢查 join 執行緒是否存活,如果 join 執行緒存活則讓當前執行緒永遠等待。所以 CountDownLatch 使用起來較為靈活。

CountDownLatch 與 CyclicBarrier

兩者都能夠實現執行緒之間的等待,只不過它們側重點不同:

CountDownLatch 一般用於一個或多個執行緒,等待其他執行緒執行完任務後,再才執行。CyclicBarrier 一般用於一組執行緒互相等待至某個狀態,然後這一組執行緒再同時執行。CountDownLatch是減計數,計數減為0後不能重用;而CyclicBarrier是加計數,可置0後複用。

CyclicBarrier 的原始碼分析請看下篇

21
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 面試官經常問AOP,那到底啥是AOP?