首頁>技術>

目錄Semaphore 簡介Semaphore 使用示例Semaphore 實現原理Semaphore 原始碼解析Semaphore 簡介

Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它透過協調各個執行緒,以保證合理的使用公共資源。

使用示例

在一個需求,開啟 30 個執行緒併發讀取檔案,並存儲至資料庫,但資料庫只有 10 個連線,如果 30 個執行緒同時存庫,則會導致無法獲取資料庫連線,這時必須控制只有 10 個執行緒同時獲取資料庫連線進行資料儲存。

public class Main {    private static final int THREAD_COUNT = 30;    private static final int CONNECTION_COUNT = 10;    public static void main(String[] args) {        Semaphore semaphore = new Semaphore(CONNECTION_COUNT);        IntStream.range(0, THREAD_COUNT).forEach(i -> new Thread(() -> {            try {                semaphore.acquire();                System.out.println("Thread["+i+"] execute save");                Thread.sleep(1000L);                semaphore.release();            } catch (InterruptedException e) {                e.printStackTrace();            }        }).start());    }}

雖然有 30 個執行緒在執行,但是隻允許 10 個併發執行,Semaphore 在構建時傳入初始值,表示可用的許可證數量,執行緒呼叫 acquire 方法可以獲取一個許可證,當 10 個執行緒呼叫 acquire 方法,許可證被領完,後續執行緒則會阻塞,領取許可證的執行緒執行 release 方法歸還許可證,此時被阻塞的執行緒就可以繼續獲取許可證。

Semaphore 還提供了一些其它方法

int availablePermits():返回訊號量中當前可用的許可證數量int getQueueLength():返回正在等待獲取許可證的執行緒數boolean hasQueuedThreads():是否有執行緒正在等待獲取許可證void reducePermits(int reduction):減少 reduction 個許可證Collection getQueuedThreads():返回所有等待獲取許可證的執行緒集合實現原理

Semaphore 內部是透過 AQS 同步器來實現的,多個執行緒同時執行,可以使用 AQS 的共享鎖機制,控制併發執行緒數量,可以透過共享狀態值進行控制。

第一步:初始化可頒發許可證數量,後臺處理為:建立 Semaphore 物件,將 AQS 同步狀態設定為可頒發許可證數量。第二步:頒發許可證,多個執行緒呼叫 acquire() 方法領取許可證,後臺處理為:執行加共享鎖,每次加鎖同步狀態減 1,如果同步狀態大於等於 0,加鎖成功,即成功頒發許可證,如果同步狀態小於 0,則加鎖失敗,執行緒進入 AQS 等待佇列阻塞等待,所以同步狀態最小為 0。第三步:歸還許可證,獲取許可證的執行緒執行完邏輯後需要呼叫 release() 方法歸還許可證,後臺處理為:執行釋放共享鎖,釋放成功,同步狀態加 1,同時會喚醒 AQS 等待佇列中的隊首執行緒。

Semaphore 提供了公平鎖與非公平鎖兩種模式,所謂公平與非公平指的是領取許可證時,需要判斷等待佇列中是否有執行緒在等待,如果有執行緒在等待,那麼公平鎖會直接進入等待佇列隊尾,而非公平鎖則與等待佇列中的執行緒競爭這個許可證。通俗點講就是,有一隊人在排隊掛號,非公平鎖機制就是,先去視窗與第一個人擠一下,看能不能插個隊,如果插不上隊,就去後面排隊,而公平鎖機制就是老實人機制,直接去後面排隊。雖然生活中做個老實人比較好一點,但在計算機中,更鼓勵不要太老實。

公平鎖與非公平鎖的機制 ReentrantLock 也有提供,可以參考本系列的 JUC原始碼系列之ReentrantLock原始碼解析

原始碼解析

透過上面的實現原理分析,已經大概瞭解了基本實現思路,以非公平鎖為例,看一下內部實現程式碼。

首先檢視構造方法,Semaphore 有兩個構造方法,permits 引數用於設定可頒發許可證數量,fair 引數用於設定內部使用公平鎖還是非公平鎖,預設使用非公平鎖。

public class Semaphore implements java.io.Serializable {    private final Sync sync;    // 預設建立非公平鎖    public Semaphore(int permits) {        sync = new NonfairSync(permits);    }    // 根據入參決定建立公平鎖還是非公平鎖    public Semaphore(int permits, boolean fair) {        sync = fair ? new FairSync(permits) : new NonfairSync(permits);    }}

以非公平鎖為例,繼續檢視原始碼,公平鎖與非公平鎖的差別無非是加鎖時的操作有點差異,在文章後面會解釋其間差異。

非公平鎖的實現非常簡單,只有兩個方法,構造方法直接呼叫父類的構造方法,tryAcquireShared 方法執行加鎖時會被呼叫,也是執行父類中的已有實現,所以主體實現其實都在 Sync 類中。

static final class NonfairSync extends Sync {    NonfairSync(int permits) {        super(permits);    }    protected int tryAcquireShared(int acquires) {        return nonfairTryAcquireShared(acquires);    }}

Sync 類繼承了 AQS 同步器,現在看的是構造方法,構造方法直接呼叫 AQS 提供的 setState 方法,設定同步狀態的值,將其設定為構造方法傳入的可頒發許可證數量。

abstract static class Sync extends AbstractQueuedSynchronizer {    Sync(int permits) {        setState(permits);    }    //... 省略其它內容}

到此處,可以知曉 Semaphore 內部實現是委託 AQS 同步器(Sync)實現,構造方法完成兩件事

確定同步器實現,公平鎖(FairSync)與非公平鎖(NonfairSync)初始化同步狀態

接著檢視頒發許可證的原始碼實現,acquire 方法其實提供了很多個,原理都是一樣的,這裡列舉出幾個。可以看出方法體都是一行程式碼,意思就是委託給 AQS 同步器。

// 方法1,阻塞式加鎖,可響應中斷public void acquire() throws InterruptedException {    sync.acquireSharedInterruptibly(1);}// 方法2,阻塞式加鎖,不可響應中斷public void acquireUninterruptibly() {    sync.acquireShared(1);}// 方法3,快速失敗式加鎖public boolean tryAcquire() {    return sync.nonfairTryAcquireShared(1) >= 0;}

上面的方法1、方法2呼叫的是 AQS 原始提供的加鎖方法,方法3呼叫的是 AQS 實現類 Sync 自實現的方法。AQS 提供的加鎖方法最終還是會呼叫 Sync 自實現的方法執行加鎖,只是 AQS 還提供了一套完整的等待佇列機制,就是說加鎖不成功就會進入等待佇列阻塞式等待,所以具體能否加鎖成功還是得看 Sync 是怎麼實現的。

非公平鎖的加鎖邏輯也比較簡單,就是透過死迴圈不斷地使用 cas 操作同步狀態,操作成功則加鎖成功,直到同步狀態小於 0,加鎖失敗,此時方法3返回加鎖失敗,由執行緒自行決定如何操作,而執行方法1、2的執行緒則會自動進入 AQS 等待佇列。

final int nonfairTryAcquireShared(int acquires) {    for (;;) {        int available = getState(); // 獲取同步狀態值        int remaining = available - acquires; // 減掉 acquires        if (remaining < 0 || // 減掉後必須大於等於 0            compareAndSetState(available, remaining)) // cas 同步狀態            return remaining;    }}

最後檢視歸還許可證的原始碼實現,直接呼叫了 AQS 的 releaseShared 方法,這個方法會呼叫 tryReleaseShared 方法執行歸還操作。

public void release() {    sync.releaseShared(1);}// Sync 類的方法protected final boolean tryReleaseShared(int releases) {    for (;;) {        int current = getState(); // 獲取同步狀態值        int next = current + releases; // 加上 releases        if (next < current) // 處理 int 越界異常            throw new Error("Maximum permit count exceeded");        if (compareAndSetState(current, next)) // cas 同步狀態            return true;    }}

AQS 本身提供了一整套的阻塞等待佇列機制,加鎖機制只需要透過控制同步狀態即可,所以從程式碼實現來看是非常簡單的。

前文中提到了公平鎖與非公平鎖的區別在於加鎖時是否加入競爭,所以公平鎖加鎖時首先呼叫 hasQueuedPredecessors() 方法判斷等待佇列是否為空,如果不為空,則加鎖失敗,執行緒自動進入 AQS 等待佇列。hasQueuedPredecessors() 方法也是 AQS 提供的方法,可以看出 AQS 已經把所有的事情都做完了。

static final class FairSync extends Sync {    FairSync(int permits) {        super(permits);    }    protected int tryAcquireShared(int acquires) {        for (;;) {            if (hasQueuedPredecessors()) // 首先判斷等待佇列是否為空,如果為空則加鎖失敗,執行緒進入等待佇列。                return -1;            int available = getState();            int remaining = available - acquires;            if (remaining < 0 ||                compareAndSetState(available, remaining))                return remaining;        }    }}

25
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Matlab入門教程(1)