首頁>技術>

1、JUC的由來

synchronized 關鍵字是JDK官方人員用C++程式碼寫的,在JDK6以前是重量級鎖。Java大牛Doug Lea對 synchronized 在併發程式設計條件下的效能表現不滿意就自己寫了個JUC,以此來提升併發效能,本文要講的就是JUC併發包下的AbstractQueuedSynchronizer

在JUC中 CountDownLatch、ReentrantLock、ThreadPoolExecutor、ReentrantReadWriteLock 等底層用的都是AQS,AQS幾乎佔據了JUC併發包裡的半壁江山,如果想要獲取鎖可以被中斷、超時獲取鎖、嘗試獲取鎖那就用AQS吧

DougLea傑作: HashMap、JUC、ConcurrentHashMap等。

溫馨提醒

2、AQS前置知識點2.1、模板方法

AbstractQueuedSynchronizer是個抽象類,所有用到方法的類都要繼承此類的若干方法,對應的設計模式就是模版模式

模版模式定義:一個抽象類公開定義了執行它的方法的方式/模板。它的子類可以按需要重寫方法實現,但呼叫將以抽象類中定義的方式進行。這種型別的設計模式屬於行為型模式。

抽象類:

public abstract class SendCustom { public abstract void to(); public abstract void from(); public void date() {  System.out.println(new Date()); } public abstract void send(); // 注意此處 框架方法-模板方法 public void sendMessage() {  to();  from();  date();  send(); }}

模板方法派生類:

public class SendSms extends SendCustom { @Override public void to() {  System.out.println("sowhat"); } @Override public void from() {  System.out.println("xiaomai"); } @Override public void send() {  System.out.println("Send message"); }  public static void main(String[] args) {  SendCustom sendC = new SendSms();  sendC.sendMessage(); }}
2.2、LookSupport

LockSupport 是一個執行緒阻塞工具類,所有的方法都是靜態方法,可以讓執行緒在任意位置阻塞,當然阻塞之後肯定得有喚醒的方法。常用方法如下:

public static void park(Object blocker); // 暫停當前執行緒public static void parkNanos(Object blocker, long nanos); // 暫停當前執行緒,不過有超時時間的限制public static void parkUntil(Object blocker, long deadline); // 暫停當前執行緒,直到某個時間public static void park(); // 無期限暫停當前執行緒public static void parkNanos(long nanos); // 暫停當前執行緒,不過有超時時間的限制public static void parkUntil(long deadline); // 暫停當前執行緒,直到某個時間public static void unpark(Thread thread); // 恢復當前執行緒public static Object getBlocker(Thread t);

park是因為park英文意思為停車。我們如果把Thread看成一輛車的話,park就是讓車停下,unpark就是讓車啟動然後跑起來。

與Object類的wait/notify機制相比,park/unpark有兩個優點:

thread為操作物件更符合阻塞執行緒的直觀定義

操作更精準,可以準確地喚醒某一個執行緒(notify隨機喚醒一個執行緒,notifyAll 喚醒所有等待的執行緒),增加了靈活性。

park/unpark呼叫的是 Unsafe(提供CAS操作) 中的 native程式碼。

park/unpark 功能在Linux系統下是用的Posix執行緒庫pthread中的mutex(互斥量),condition(條件變數)來實現的。mutexcondition保護了一個 _counter 的變數,當 park 時,這個變數被設定為0。當unpark時,這個變數被設定為1。

2.3、CAS

CAS 是 CPU指令級別實現了原子性的比較和交換(Conmpare And Swap)操作,注意CAS不是鎖只是CPU提供的一個原子性操作指令。

CAS在語言層面不進行任何處理,直接將原則操作實現在硬體級別實現,之所以可以實現硬體級別的操作核心是因為CAS操作類中有個核心類UnSafe類。

關於CAS引發的ABA問題、效能開銷問題、只能保證一個共享變數之間的原則性操作問題,以前CAS中寫過,在此不再重複講解。

注意:並不是說 CAS 一定比SYN好,如果高併發執行時間久 ,用SYN好, 因為SYN底層用了wait() 阻塞後是不消耗CPU資源的。如果鎖競爭不激烈說明自旋不嚴重,此時用CAS。

3、AQS重要方法

模版方法分為獨佔式跟共享式,子類根據需要不同調用不同的模版方法(講解有點多,想看底層可直接下滑到第四章節)。

3.1 模板方法3.1.1 獨佔式獲取3.1.1.1 accquire

不可中斷獲取鎖accquire是獲取獨佔鎖方法,acquire嘗試獲取資源,成功則直接返回,不成功則進入等待佇列,這個過程不會被執行緒中斷,被外部中斷也不響應,獲取資源後才再進行自我中斷selfInterrupt()。

public final void acquire(int arg) {        if (!tryAcquire(arg) &&            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            selfInterrupt();}
acquire(arg) tryAcquire(arg) 顧名思義,它就是嘗試獲取鎖,需要我們自己實現具體細節,一般要求是:

如果該鎖沒有被另一個執行緒保持,則獲取該鎖並立即返回,將鎖的保持計數設定為 1。

如果當前執行緒已經保持該鎖,則將保持計數加 1,並且該方法立即返回。

如果該鎖被另一個執行緒保持,則出於執行緒排程的目的,禁用當前執行緒,並且在獲得鎖之前,該執行緒將一直處於休眠狀態,此時鎖保持計數被設定為 1。

addWaiter(Node.EXCLUSIVE)

主要功能是 一旦嘗試獲取鎖未成功,就要使用該方法將其加入同步佇列尾部,由於可能有多個執行緒併發加入隊尾產生競爭,因此採用compareAndSetTail鎖方法來保證同步

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

一旦加入同步佇列,就需要使用該方法,自旋阻塞 喚醒來不斷的嘗試獲取鎖,直到被中斷或獲取到鎖。

3.1.1.2 acquireInterruptibly

可中斷獲取鎖acquireInterruptibly相比於acquire支援響應中斷。

1、如果當前執行緒未被中斷,則嘗試獲取鎖。

2、如果鎖空閒則獲鎖並立即返回,state = 1。

3、如果當前執行緒已持此鎖,state + 1,並且該方法立即返回。

4、如果鎖被另一個執行緒保持,出於執行緒排程目的,禁用當前執行緒,執行緒休眠ing,除非鎖由當前執行緒獲得或者當前執行緒被中斷了,中斷後會丟擲InterruptedException,並且清除當前執行緒的已中斷狀態。

5、此方法是一個顯式中斷點,所以要優先考慮響應中斷。

 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())     throw new InterruptedException(); // acquireInterruptibly 選擇      interrupted = true; // acquire 的選擇
3.1.1.3 tryAcquireNanos

該方法可以被中斷,增加了超時則失敗的功能。可以說該方法的實現與上述兩方法沒有任何區別。時間功能上就是用的標準超時功能,如果剩餘時間小於0那麼acquire失敗,如果該時間大於一次自旋鎖時間(spinForTimeoutThreshold = 1000),並且可以被阻塞,那麼呼叫LockSupport.parkNanos方法阻塞執行緒。

doAcquireNanos內部:

  nanosTimeout = deadline - System.nanoTime();  if (nanosTimeout <= 0L)      return false;  if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)      LockSupport.parkNanos(this, nanosTimeout);  if (Thread.interrupted())      throw new InterruptedException();

該方法一般會有以下幾種情況產生:

在指定時間內,執行緒獲取到鎖,返回true。

當前執行緒在超時時間內被中斷,拋中斷異常後,執行緒退出。

到截止時間後執行緒仍未獲取到鎖,此時執行緒獲得鎖失敗,不再等待直接返回false。

3.1.2 共享式獲取3.1.2.1 acquireShared
    public final void acquireShared(int arg) {        if (tryAcquireShared(arg) < 0)            doAcquireShared(arg);    }

該模版方法的工作:

呼叫tryAcquireShared(arg) 嘗試獲得資源,返回值代表如下含義:

負數表示失敗。

0 表示成功,但沒有剩餘可用資源。

正數表示成功,且有剩餘資源。

doAcquireShared作用:

建立節點然後加入到佇列中去,這一塊和獨佔模式下的 addWaiter 程式碼差不多,不同的是結點的模式是SHARED,在獨佔模式 EXCLUSIVE。

3.1.2.2 acquireSharedInterruptibly

無非就是可中斷性的共享方法

public final void acquireSharedInterruptibly(long arg)  throws InterruptedException {    if (Thread.interrupted()) // 如果執行緒被中斷,則丟擲異常        throw new InterruptedException();    if (tryAcquireShared(arg) < 0)          // 如果tryAcquireShared()方法獲取失敗,則呼叫如下的方法        doAcquireSharedInterruptibly(arg);}
3.1.2.3. tryAcquireSharedNanos

嘗試以共享模式獲取,如果被中斷則中止,如果超過給定超時期則失敗。實現此方法首先要檢查中斷狀態,然後至少呼叫一次 tryacquireshared(long),並在成功時返回。否則,在成功、執行緒中斷或超過超時期之前,執行緒將加入佇列,可能反覆處於阻塞或未阻塞狀態,並一直呼叫 tryacquireshared(long)。

    public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)            throws InterruptedException {        if (Thread.interrupted())            throw new InterruptedException();        return tryAcquireShared(arg) >= 0 ||            doAcquireSharedNanos(arg, nanosTimeout);    }
3.1.3 獨佔式釋放

獨佔鎖的釋放呼叫unlock方法,而該方法實際呼叫了AQS的release方法,這段程式碼邏輯比較簡單,如果同步狀態釋放成功(tryRelease返回true)則會執行if塊中的程式碼,當head指向的頭結點不為null,並且該節點的狀態值不為0的話才會執行unparkSuccessor()方法。

    public final boolean release(long arg) {        if (tryRelease(arg)) {            Node h = head;            if (h != null && h.waitStatus != 0)                unparkSuccessor(h);            return true;        }        return false;    }
3.1.4 共享式釋放

releaseShared首先去嘗試釋放資源tryReleaseShared(arg),如果釋放成功了,就代表有資源空閒出來,那麼就用doReleaseShared()去喚醒後續結點。

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

比如CountDownLatch的countDown()具體實現:

    public void countDown() {        sync.releaseShared(1);    }
3.2 子類需實現方法

子類要實現父類方法也分為獨佔式跟共享式。

3.2.1 獨佔式獲取

tryAcquire 顧名思義,就是嘗試獲取鎖,AQS在這裡沒有對其進行功能的實現,只有一個丟擲異常的語句,我們需要自己對其進行實現,可以對其重寫實現公平鎖、不公平鎖、可重入鎖、不可重入鎖

protected boolean tryAcquire(int arg) {        throw new UnsupportedOperationException();}
3.2.2 獨佔式釋放

tryRelease 嘗試釋放 獨佔鎖,需要子類實現。

   protected boolean tryRelease(long arg) {        throw new UnsupportedOperationException();    }
3.2.3 共享式獲取

tryAcquireShared 嘗試進行共享鎖的獲得,需要子類實現。

protected long tryAcquireShared(long arg) {        throw new UnsupportedOperationException();    }
3.2.4 共享式釋放

tryReleaseShared嘗試進行共享鎖的釋放,需要子類實現。

    protected boolean tryReleaseShared(long arg) {        throw new UnsupportedOperationException();    }
3.3 狀態標誌位

state因為用 volatile修飾 保證了我們操作的可見性,所以任何執行緒透過getState()獲得狀態都是可以得到最新值,但是setState()無法保證原子性,因此AQS給我們提供了compareAndSetState方法利用底層UnSafe的CAS功能來實現原子性。

    private volatile long state;    protected final long getState() {        return state;    }    protected final void setState(long newState) {        state = newState;    }   protected final boolean compareAndSetState(long expect, long update) {        return unsafe.compareAndSwapLong(this, stateOffset, expect, update);    }
3.4 查詢是否獨佔模式

isHeldExclusively 該函式的功能是查詢當前的工作模式是否是獨佔模式。需要子類實現。

    protected boolean isHeldExclusively() {        throw new UnsupportedOperationException();    }
3.5 自定義實現鎖

這裡需要重點說明一點,JUC中一般是用一個子類繼承自Lock,然後在子類中定義一個內部類來實現AQS的繼承跟使用

public class SowhatLock implements Lock{ private Sync sync = new Sync(); @Override public void lock() {  sync.acquire(1); } @Override public boolean tryLock() {  return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {  return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() {  sync.release(1); } @Override public Condition newCondition() {  return sync.newCondition(); } @Override public void lockInterruptibly() throws InterruptedException { } private class Sync extends AbstractQueuedSynchronizer {  @Override  protected boolean tryAcquire(int arg)  {   assert arg == 1;   if (compareAndSetState(0, 1))   {    setExclusiveOwnerThread(Thread.currentThread());    return true;   }   return false;  }  @Override  protected boolean tryRelease(int arg)  {   assert arg == 1;   if (!isHeldExclusively())   {    throw new IllegalMonitorStateException();   }   setExclusiveOwnerThread(null);   setState(0);   return true;  }  @Override  protected boolean isHeldExclusively()  {   return getExclusiveOwnerThread() == Thread.currentThread();  }  Condition newCondition() {   return new ConditionObject();  } }}

自定義實現類:

12
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 由淺入深逐步講解Java併發的半壁江山—AQS(下文)