1、JUC的由來
synchronized 關鍵字是JDK官方人員用C++程式碼寫的,在JDK6以前是重量級鎖。Java大牛Doug Lea對 synchronized 在併發程式設計條件下的效能表現不滿意就自己寫了個JUC,以此來提升併發效能,本文要講的就是JUC併發包下的AbstractQueuedSynchronizer。
在JUC中 CountDownLatch、ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock 等底層用的都是AQS,AQS幾乎佔據了JUC併發包裡的半壁江山,如果想要獲取鎖可以被中斷、超時獲取鎖、嘗試獲取鎖那就用AQS吧。
DougLea傑作: HashMap、JUC、ConcurrentHashMap等。
溫馨提醒:
2、AQS前置知識點2.1、模板方法AbstractQueuedSynchronizer是個抽象類,所有用到方法的類都要繼承此類的若干方法,對應的設計模式就是模版模式。
模版模式定義:一個抽象類公開定義了執行它的方法的方式/模板。它的子類可以按需要重寫方法實現,但呼叫將以抽象類中定義的方式進行。這種型別的設計模式屬於行為型模式。
抽象類:
public abstract class SendCustom { public abstract void to(); public abstract void from(); public void date() { System.out.println(new Date()); } public abstract void send(); // 注意此處 框架方法-模板方法 public void sendMessage() { to(); from(); date(); send(); }}
模板方法派生類:
public class SendSms extends SendCustom { @Override public void to() { System.out.println("sowhat"); } @Override public void from() { System.out.println("xiaomai"); } @Override public void send() { System.out.println("Send message"); } public static void main(String[] args) { SendCustom sendC = new SendSms(); sendC.sendMessage(); }}
2.2、LookSupport
LockSupport 是一個執行緒阻塞工具類,所有的方法都是靜態方法,可以讓執行緒在任意位置阻塞,當然阻塞之後肯定得有喚醒的方法。常用方法如下:
public static void park(Object blocker); // 暫停當前執行緒public static void parkNanos(Object blocker, long nanos); // 暫停當前執行緒,不過有超時時間的限制public static void parkUntil(Object blocker, long deadline); // 暫停當前執行緒,直到某個時間public static void park(); // 無期限暫停當前執行緒public static void parkNanos(long nanos); // 暫停當前執行緒,不過有超時時間的限制public static void parkUntil(long deadline); // 暫停當前執行緒,直到某個時間public static void unpark(Thread thread); // 恢復當前執行緒public static Object getBlocker(Thread t);
叫park是因為park英文意思為停車。我們如果把Thread看成一輛車的話,park就是讓車停下,unpark就是讓車啟動然後跑起來。
與Object類的wait/notify機制相比,park/unpark有兩個優點:
以thread為操作物件更符合阻塞執行緒的直觀定義
操作更精準,可以準確地喚醒某一個執行緒(notify隨機喚醒一個執行緒,notifyAll 喚醒所有等待的執行緒),增加了靈活性。
park/unpark呼叫的是 Unsafe(提供CAS操作) 中的 native程式碼。
park/unpark 功能在Linux系統下是用的Posix執行緒庫pthread中的mutex(互斥量),condition(條件變數)來實現的。mutex和condition保護了一個 _counter 的變數,當 park 時,這個變數被設定為0。當unpark時,這個變數被設定為1。
2.3、CASCAS 是 CPU指令級別實現了原子性的比較和交換(Conmpare And Swap)操作,注意CAS不是鎖只是CPU提供的一個原子性操作指令。
CAS在語言層面不進行任何處理,直接將原則操作實現在硬體級別實現,之所以可以實現硬體級別的操作核心是因為CAS操作類中有個核心類UnSafe類。
關於CAS引發的ABA問題、效能開銷問題、只能保證一個共享變數之間的原則性操作問題,以前CAS中寫過,在此不再重複講解。
注意:並不是說 CAS 一定比SYN好,如果高併發執行時間久 ,用SYN好, 因為SYN底層用了wait() 阻塞後是不消耗CPU資源的。如果鎖競爭不激烈說明自旋不嚴重,此時用CAS。
3、AQS重要方法模版方法分為獨佔式跟共享式,子類根據需要不同調用不同的模版方法(講解有點多,想看底層可直接下滑到第四章節)。
3.1 模板方法3.1.1 獨佔式獲取3.1.1.1 accquire不可中斷獲取鎖accquire是獲取獨佔鎖方法,acquire嘗試獲取資源,成功則直接返回,不成功則進入等待佇列,這個過程不會被執行緒中斷,被外部中斷也不響應,獲取資源後才再進行自我中斷selfInterrupt()。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
acquire(arg) tryAcquire(arg) 顧名思義,它就是嘗試獲取鎖,需要我們自己實現具體細節,一般要求是:
如果該鎖沒有被另一個執行緒保持,則獲取該鎖並立即返回,將鎖的保持計數設定為 1。
如果當前執行緒已經保持該鎖,則將保持計數加 1,並且該方法立即返回。
如果該鎖被另一個執行緒保持,則出於執行緒排程的目的,禁用當前執行緒,並且在獲得鎖之前,該執行緒將一直處於休眠狀態,此時鎖保持計數被設定為 1。
addWaiter(Node.EXCLUSIVE)主要功能是 一旦嘗試獲取鎖未成功,就要使用該方法將其加入同步佇列尾部,由於可能有多個執行緒併發加入隊尾產生競爭,因此採用compareAndSetTail鎖方法來保證同步
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)一旦加入同步佇列,就需要使用該方法,自旋阻塞 喚醒來不斷的嘗試獲取鎖,直到被中斷或獲取到鎖。
3.1.1.2 acquireInterruptibly可中斷獲取鎖acquireInterruptibly相比於acquire支援響應中斷。
1、如果當前執行緒未被中斷,則嘗試獲取鎖。
2、如果鎖空閒則獲鎖並立即返回,state = 1。
3、如果當前執行緒已持此鎖,state + 1,並且該方法立即返回。
4、如果鎖被另一個執行緒保持,出於執行緒排程目的,禁用當前執行緒,執行緒休眠ing,除非鎖由當前執行緒獲得或者當前執行緒被中斷了,中斷後會丟擲InterruptedException,並且清除當前執行緒的已中斷狀態。
5、此方法是一個顯式中斷點,所以要優先考慮響應中斷。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); // acquireInterruptibly 選擇 interrupted = true; // acquire 的選擇
3.1.1.3 tryAcquireNanos該方法可以被中斷,增加了超時則失敗的功能。可以說該方法的實現與上述兩方法沒有任何區別。時間功能上就是用的標準超時功能,如果剩餘時間小於0那麼acquire失敗,如果該時間大於一次自旋鎖時間(spinForTimeoutThreshold = 1000),並且可以被阻塞,那麼呼叫LockSupport.parkNanos方法阻塞執行緒。
doAcquireNanos內部:
nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException();
該方法一般會有以下幾種情況產生:
在指定時間內,執行緒獲取到鎖,返回true。
當前執行緒在超時時間內被中斷,拋中斷異常後,執行緒退出。
到截止時間後執行緒仍未獲取到鎖,此時執行緒獲得鎖失敗,不再等待直接返回false。
3.1.2 共享式獲取3.1.2.1 acquireShared public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
該模版方法的工作:
呼叫tryAcquireShared(arg) 嘗試獲得資源,返回值代表如下含義:負數表示失敗。
0 表示成功,但沒有剩餘可用資源。
正數表示成功,且有剩餘資源。
doAcquireShared作用:
建立節點然後加入到佇列中去,這一塊和獨佔模式下的 addWaiter 程式碼差不多,不同的是結點的模式是SHARED,在獨佔模式 EXCLUSIVE。
3.1.2.2 acquireSharedInterruptibly無非就是可中斷性的共享方法
public final void acquireSharedInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) // 如果執行緒被中斷,則丟擲異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) // 如果tryAcquireShared()方法獲取失敗,則呼叫如下的方法 doAcquireSharedInterruptibly(arg);}
3.1.2.3. tryAcquireSharedNanos嘗試以共享模式獲取,如果被中斷則中止,如果超過給定超時期則失敗。實現此方法首先要檢查中斷狀態,然後至少呼叫一次 tryacquireshared(long),並在成功時返回。否則,在成功、執行緒中斷或超過超時期之前,執行緒將加入佇列,可能反覆處於阻塞或未阻塞狀態,並一直呼叫 tryacquireshared(long)。
public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
3.1.3 獨佔式釋放獨佔鎖的釋放呼叫unlock方法,而該方法實際呼叫了AQS的release方法,這段程式碼邏輯比較簡單,如果同步狀態釋放成功(tryRelease返回true)則會執行if塊中的程式碼,當head指向的頭結點不為null,並且該節點的狀態值不為0的話才會執行unparkSuccessor()方法。
public final boolean release(long arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
3.1.4 共享式釋放releaseShared首先去嘗試釋放資源tryReleaseShared(arg),如果釋放成功了,就代表有資源空閒出來,那麼就用doReleaseShared()去喚醒後續結點。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
比如CountDownLatch的countDown()具體實現:
public void countDown() { sync.releaseShared(1); }
3.2 子類需實現方法子類要實現父類方法也分為獨佔式跟共享式。
3.2.1 獨佔式獲取tryAcquire 顧名思義,就是嘗試獲取鎖,AQS在這裡沒有對其進行功能的實現,只有一個丟擲異常的語句,我們需要自己對其進行實現,可以對其重寫實現公平鎖、不公平鎖、可重入鎖、不可重入鎖
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
3.2.2 獨佔式釋放
tryRelease 嘗試釋放 獨佔鎖,需要子類實現。
protected boolean tryRelease(long arg) { throw new UnsupportedOperationException(); }
3.2.3 共享式獲取tryAcquireShared 嘗試進行共享鎖的獲得,需要子類實現。
protected long tryAcquireShared(long arg) { throw new UnsupportedOperationException(); }
3.2.4 共享式釋放
tryReleaseShared嘗試進行共享鎖的釋放,需要子類實現。
protected boolean tryReleaseShared(long arg) { throw new UnsupportedOperationException(); }
3.3 狀態標誌位
state因為用 volatile修飾 保證了我們操作的可見性,所以任何執行緒透過getState()獲得狀態都是可以得到最新值,但是setState()無法保證原子性,因此AQS給我們提供了compareAndSetState方法利用底層UnSafe的CAS功能來實現原子性。
private volatile long state; protected final long getState() { return state; } protected final void setState(long newState) { state = newState; } protected final boolean compareAndSetState(long expect, long update) { return unsafe.compareAndSwapLong(this, stateOffset, expect, update); }
3.4 查詢是否獨佔模式isHeldExclusively 該函式的功能是查詢當前的工作模式是否是獨佔模式。需要子類實現。
protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }
3.5 自定義實現鎖
這裡需要重點說明一點,JUC中一般是用一個子類繼承自Lock,然後在子類中定義一個內部類來實現AQS的繼承跟使用。
public class SowhatLock implements Lock{ private Sync sync = new Sync(); @Override public void lock() { sync.acquire(1); } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } @Override public void lockInterruptibly() throws InterruptedException { } private class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { assert arg == 1; if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { assert arg == 1; if (!isHeldExclusively()) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } Condition newCondition() { return new ConditionObject(); } }}
自定義實現類: