目錄Semaphore 簡介Semaphore 使用示例Semaphore 實現原理Semaphore 原始碼解析Semaphore 簡介
Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它透過協調各個執行緒,以保證合理的使用公共資源。
使用示例在一個需求,開啟 30 個執行緒併發讀取檔案,並存儲至資料庫,但資料庫只有 10 個連線,如果 30 個執行緒同時存庫,則會導致無法獲取資料庫連線,這時必須控制只有 10 個執行緒同時獲取資料庫連線進行資料儲存。
public class Main { private static final int THREAD_COUNT = 30; private static final int CONNECTION_COUNT = 10; public static void main(String[] args) { Semaphore semaphore = new Semaphore(CONNECTION_COUNT); IntStream.range(0, THREAD_COUNT).forEach(i -> new Thread(() -> { try { semaphore.acquire(); System.out.println("Thread["+i+"] execute save"); Thread.sleep(1000L); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start()); }}
雖然有 30 個執行緒在執行,但是隻允許 10 個併發執行,Semaphore 在構建時傳入初始值,表示可用的許可證數量,執行緒呼叫 acquire 方法可以獲取一個許可證,當 10 個執行緒呼叫 acquire 方法,許可證被領完,後續執行緒則會阻塞,領取許可證的執行緒執行 release 方法歸還許可證,此時被阻塞的執行緒就可以繼續獲取許可證。
Semaphore 還提供了一些其它方法
int availablePermits():返回訊號量中當前可用的許可證數量int getQueueLength():返回正在等待獲取許可證的執行緒數boolean hasQueuedThreads():是否有執行緒正在等待獲取許可證void reducePermits(int reduction):減少 reduction 個許可證Collection getQueuedThreads():返回所有等待獲取許可證的執行緒集合實現原理Semaphore 內部是透過 AQS 同步器來實現的,多個執行緒同時執行,可以使用 AQS 的共享鎖機制,控制併發執行緒數量,可以透過共享狀態值進行控制。
第一步:初始化可頒發許可證數量,後臺處理為:建立 Semaphore 物件,將 AQS 同步狀態設定為可頒發許可證數量。第二步:頒發許可證,多個執行緒呼叫 acquire() 方法領取許可證,後臺處理為:執行加共享鎖,每次加鎖同步狀態減 1,如果同步狀態大於等於 0,加鎖成功,即成功頒發許可證,如果同步狀態小於 0,則加鎖失敗,執行緒進入 AQS 等待佇列阻塞等待,所以同步狀態最小為 0。第三步:歸還許可證,獲取許可證的執行緒執行完邏輯後需要呼叫 release() 方法歸還許可證,後臺處理為:執行釋放共享鎖,釋放成功,同步狀態加 1,同時會喚醒 AQS 等待佇列中的隊首執行緒。Semaphore 提供了公平鎖與非公平鎖兩種模式,所謂公平與非公平指的是領取許可證時,需要判斷等待佇列中是否有執行緒在等待,如果有執行緒在等待,那麼公平鎖會直接進入等待佇列隊尾,而非公平鎖則與等待佇列中的執行緒競爭這個許可證。通俗點講就是,有一隊人在排隊掛號,非公平鎖機制就是,先去視窗與第一個人擠一下,看能不能插個隊,如果插不上隊,就去後面排隊,而公平鎖機制就是老實人機制,直接去後面排隊。雖然生活中做個老實人比較好一點,但在計算機中,更鼓勵不要太老實。
公平鎖與非公平鎖的機制 ReentrantLock 也有提供,可以參考本系列的 JUC原始碼系列之ReentrantLock原始碼解析
原始碼解析透過上面的實現原理分析,已經大概瞭解了基本實現思路,以非公平鎖為例,看一下內部實現程式碼。
首先檢視構造方法,Semaphore 有兩個構造方法,permits 引數用於設定可頒發許可證數量,fair 引數用於設定內部使用公平鎖還是非公平鎖,預設使用非公平鎖。
public class Semaphore implements java.io.Serializable { private final Sync sync; // 預設建立非公平鎖 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 根據入參決定建立公平鎖還是非公平鎖 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }}
以非公平鎖為例,繼續檢視原始碼,公平鎖與非公平鎖的差別無非是加鎖時的操作有點差異,在文章後面會解釋其間差異。
非公平鎖的實現非常簡單,只有兩個方法,構造方法直接呼叫父類的構造方法,tryAcquireShared 方法執行加鎖時會被呼叫,也是執行父類中的已有實現,所以主體實現其實都在 Sync 類中。
static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }}
Sync 類繼承了 AQS 同步器,現在看的是構造方法,構造方法直接呼叫 AQS 提供的 setState 方法,設定同步狀態的值,將其設定為構造方法傳入的可頒發許可證數量。
abstract static class Sync extends AbstractQueuedSynchronizer { Sync(int permits) { setState(permits); } //... 省略其它內容}
到此處,可以知曉 Semaphore 內部實現是委託 AQS 同步器(Sync)實現,構造方法完成兩件事
確定同步器實現,公平鎖(FairSync)與非公平鎖(NonfairSync)初始化同步狀態接著檢視頒發許可證的原始碼實現,acquire 方法其實提供了很多個,原理都是一樣的,這裡列舉出幾個。可以看出方法體都是一行程式碼,意思就是委託給 AQS 同步器。
// 方法1,阻塞式加鎖,可響應中斷public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1);}// 方法2,阻塞式加鎖,不可響應中斷public void acquireUninterruptibly() { sync.acquireShared(1);}// 方法3,快速失敗式加鎖public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0;}
上面的方法1、方法2呼叫的是 AQS 原始提供的加鎖方法,方法3呼叫的是 AQS 實現類 Sync 自實現的方法。AQS 提供的加鎖方法最終還是會呼叫 Sync 自實現的方法執行加鎖,只是 AQS 還提供了一套完整的等待佇列機制,就是說加鎖不成功就會進入等待佇列阻塞式等待,所以具體能否加鎖成功還是得看 Sync 是怎麼實現的。
非公平鎖的加鎖邏輯也比較簡單,就是透過死迴圈不斷地使用 cas 操作同步狀態,操作成功則加鎖成功,直到同步狀態小於 0,加鎖失敗,此時方法3返回加鎖失敗,由執行緒自行決定如何操作,而執行方法1、2的執行緒則會自動進入 AQS 等待佇列。
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); // 獲取同步狀態值 int remaining = available - acquires; // 減掉 acquires if (remaining < 0 || // 減掉後必須大於等於 0 compareAndSetState(available, remaining)) // cas 同步狀態 return remaining; }}
最後檢視歸還許可證的原始碼實現,直接呼叫了 AQS 的 releaseShared 方法,這個方法會呼叫 tryReleaseShared 方法執行歸還操作。
public void release() { sync.releaseShared(1);}// Sync 類的方法protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); // 獲取同步狀態值 int next = current + releases; // 加上 releases if (next < current) // 處理 int 越界異常 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) // cas 同步狀態 return true; }}
AQS 本身提供了一整套的阻塞等待佇列機制,加鎖機制只需要透過控制同步狀態即可,所以從程式碼實現來看是非常簡單的。
前文中提到了公平鎖與非公平鎖的區別在於加鎖時是否加入競爭,所以公平鎖加鎖時首先呼叫 hasQueuedPredecessors() 方法判斷等待佇列是否為空,如果不為空,則加鎖失敗,執行緒自動進入 AQS 等待佇列。hasQueuedPredecessors() 方法也是 AQS 提供的方法,可以看出 AQS 已經把所有的事情都做完了。
static final class FairSync extends Sync { FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) // 首先判斷等待佇列是否為空,如果為空則加鎖失敗,執行緒進入等待佇列。 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }}