首頁>技術>

4、AQS底層4.1 CLH

CLH(Craig、 Landin、 Hagersten locks三個人名字綜合而命名):

是一個自旋鎖,能確保無飢餓性,提供先來先服務的公平性。

CLH 鎖也是一種基於連結串列的可擴充套件、高效能、公平的自旋鎖,申請執行緒只在本地變數上自旋,它不斷輪詢前驅的狀態,如果發現前驅釋放了鎖就結束自旋。

4.2 Node

CLH 佇列由Node物件組成,其中Node是AQS中的內部類。

static final class Node { // 標識共享鎖 static final Node SHARED = new Node(); // 標識獨佔鎖 static final Node EXCLUSIVE = null; // 前驅節點 volatile Node prev; // 後繼節點 volatile Node next; // 獲取鎖失敗的執行緒儲存在Node節點中。 volatile Thread thread; // 當我們呼叫了Condition後他也有一個等待佇列 Node nextWaiter; //在Node節點中一般透過waitStatus獲得下面節點不同的狀態,狀態對應下方。 volatile int waitStatus; static final int CANCELLED =  1; static final int SIGNAL    = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;

waitStatus 有如下5中狀態:

CANCELLED = 1

表示當前結點已取消排程。當超時或被中斷(響應中斷的情況下),會觸發變更為此狀態,進入該狀態後的結點將不會再變化。

SIGNAL = -1

表示後繼結點在等待當前結點喚醒。後繼結點入隊時,會將前繼結點的狀態更新為 SIGNAL。

CONDITION = -2

表示結點等待在 Condition 上,當其他執行緒呼叫了 Condition 的 signal() 方法後,CONDITION狀態的結點將從等待佇列轉移到同步佇列中,等待獲取同步鎖。

PROPAGATE = -3

共享模式下,前繼結點不僅會喚醒其後繼結點,同時也可能會喚醒後繼的後繼結點。

INITIAL = 0

新結點入隊時的預設狀態。

4.3 AQS實現4.3.1 公平鎖和非公平鎖

銀行售票視窗營業中:

公平排隊:每個客戶來了自動在最後面排隊,輪到自己辦理業務的時候拿出身份證等證件取票。

非公平排隊:有個旅客火車馬上開車了,他拿著自己的各種證件著急這想跟視窗工作人員說是否可以加急辦理下,可以的話則直接辦理,不可以的話則去隊尾排隊去。

在JUC中同樣存在公平鎖跟非公平鎖,一般非公平鎖效率好一些。因為非公平鎖狀態下打算搶鎖的執行緒不用排隊掛起了

4.3.2 AQS細節

AQS內部維護著一個FIFO的佇列,即CLH佇列,提供先來先服務的公平性。AQS的同步機制就是依靠CLH佇列實現的。CLH佇列是FIFO的雙端雙向連結串列佇列(方便尾部節點插入)。執行緒透過AQS獲取鎖失敗,就會將執行緒封裝成一個Node節點,透過CAS原子操作插入佇列尾。當有執行緒釋放鎖時,會嘗試讓隊頭的next節點佔用鎖,個人理解AQS具有如下幾個特點:

在AQS 同步佇列中 -1 表示執行緒在睡眠狀態

當前Node節點執行緒會把前一個Node.ws = -1。當前節點把前面節點ws設定為-1,你可以理解為:你自己能知道自己睡著了嗎?只能是別人看到了發現你睡眠了!

持有鎖的執行緒永遠不在佇列中。

在AQS佇列中第二個才是最先排隊的執行緒。

如果是交替型任務或者單執行緒任務,即使用了Lock也不會涉及到AQS 佇列。

不到萬不得已不要輕易park執行緒,很耗時的!所以排隊的頭執行緒會自旋的嘗試幾個獲取鎖。

private Lock lock = new ReentrantLock();public void test(){    lock.lock();    try{        doSomeThing();    }catch (Exception e){      ...    }finally {        lock.unlock();    }}
4.4.1 獨佔式加入同步佇列

同步器AQS中包含兩個節點型別的引用:一個指向頭結點的引用(head),一個指向尾節點的引用(tail),如果加入的節點是OK的則會直接執行該節點,當若干個執行緒搶鎖失敗了那麼就會搶著加入到同步佇列的尾部,因為是搶著加入這個時候用CAS來設定尾部節點。入口程式碼:

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}
tryAcquire

該方法是需要自我實現的,在上面的demo中可見一斑,就是返回是否獲得了鎖。

 protected final boolean tryAcquire(int acquires) {     final Thread current = Thread.currentThread();     int c = getState();     if (c == 0) {         //  是否需要加入佇列,不需要的話則嘗試CAS獲得鎖,獲得成功後 設定當前鎖的擁有者         if (!hasQueuedPredecessors() &&             compareAndSetState(0, acquires)) {             setExclusiveOwnerThread(current);             return true;         }     }     else if (current == getExclusiveOwnerThread()) {         // 這就是可重入鎖的實現           int nextc = c + acquires;         if (nextc < 0)             throw new Error("Maximum lock count exceeded");         setState(nextc);         return true;     }     return false; }
addWaiter(Node.EXCLUSIVE,arg)
/** * 如果嘗試獲取同步狀態失敗的話,則構造同步節點(獨佔式的Node.EXCLUSIVE),透過addWaiter(Node node,int args)方法將該節點加入到同步佇列的隊尾。 */ private Node addWaiter(Node mode) {     // 用當前執行緒構造一個Node物件,mode是一個表示Node型別的欄位,或者說是這個節點是獨佔的還是共享的     Node node = new Node(Thread.currentThread(), mode);     // 將目前佇列中尾部節點給pred     Node pred = tail;     // 佇列不為空的時候     if (pred != null) {         node.prev = pred;         // 先嚐試透過AQS方式修改尾節點為最新的節點,如果修改失敗,意味著有併發,         if (compareAndSetTail(pred, node)) {             pred.next = node;             return node;         }     }     //第一次嘗試新增尾部失敗說明有併發,此時進入自旋     enq(node);     return node; }
自旋enq 方法將併發新增節點的請求透過CAS跟自旋將尾節點的新增變得序列化起來。說白了就是讓節點放到正確的隊尾位置。
/*** 這裡進行了迴圈,如果此時存在了tail就執行同上一步驟的新增隊尾操作,如果依然不存在,* 就把當前執行緒作為head結點。插入節點後,呼叫acquireQueued()進行阻塞*/private Node enq(final Node node) {   for (;;) {       Node t = tail;       if (t == null) { // Must initialize           if (compareAndSetHead(new Node()))               tail = head;       } else {           node.prev = t;           if (compareAndSetTail(t, node)) {               t.next = node;               return t;           }       }   }}
acquireQueued 是當前Node節點執行緒在死迴圈中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取鎖,原因是:

頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的執行緒釋放了同步狀態以後,將會喚醒其後繼節點,後繼節點的執行緒被喚醒後要檢查自己的前驅節點是否為頭結點。

維護同步佇列的FIFO原則,節點進入同步佇列之後,會嘗試自旋幾次。

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        // 自旋檢查當前節點的前驅節點是否為頭結點,才能獲取鎖        for (;;) {            // 獲取節點的前驅節點            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {            // 節點中的執行緒迴圈的檢查,自己的前驅節點是否為頭節點            // 只有當前節點 前驅節點是頭節點才會 再次呼叫我們實現的方法tryAcquire            // 接下來無非就是將當前節點設定為頭結點,移除之前的頭節點                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            // 否則檢查前一個節點的狀態,看當前獲取鎖失敗的執行緒是否要掛起            if (shouldParkAfterFailedAcquire(p, node) &&           //如果需要掛起,藉助JUC包下面的LockSupport類的靜態方法park掛起當前執行緒,直到被喚醒                parkAndCheckInterrupt())                interrupted = true; // 兩個判斷都是true說明 則置true        }    } finally {        //如果等待過程中沒有成功獲取資源(如timeout,或者可中斷的情況下被中斷了),那麼取消結點在佇列中的等待。        if (failed)           //取消請求,將當前節點從佇列中移除            cancelAcquire(node);    }}

如果成功就返回,否則就執行shouldParkAfterFailedAcquire、parkAndCheckInterrupt來達到阻塞效果。

shouldParkAfterFailedAcquire 第二步的addWaiter()構造的新節點,waitStatus的預設值是0。此時,會進入最後一個if判斷,CAS設定pred.waitStatus SIGNAL,最後返回false。由於返回false,第四步的acquireQueued會繼續進行迴圈。假設node的前繼節點pred仍然不是頭結點或鎖獲取失敗,則會再次進入shouldParkAfterFailedAcquire()。上一輪迴圈中已經將pred.waitStatu = -1了,則這次會進入第一個判斷條件,直接返回true,表示應該阻塞呼叫parkAndCheckInterrupt。

那麼什麼時候會遇到ws > 0呢?當pred所維護的獲取請求被取消時(也就是node.waitStatus = CANCELLED,這時就會迴圈移除所有被取消的前繼節點pred,直到找到未被取消的pred。移除所有被取消的前繼節點後,直接返回false。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {       int ws = pred.waitStatus; // 獲得前驅節點的狀態       if (ws == Node.SIGNAL) //此處是第二次設定           return true;       if (ws > 0) {          do {               node.prev = pred = pred.prev;           } while (pred.waitStatus > 0);           pred.next = node;       } else {          //  此處是第一次設定 unsafe級別呼叫設定          compareAndSetWaitStatus(pred, ws, Node.SIGNAL);       }       return false;   }
parkAndCheckInterrupt 主要任務是暫停當前執行緒然後檢視是否已經暫停了。
private final boolean parkAndCheckInterrupt() {    // 呼叫park()使執行緒進入掛起狀態,什麼時候呼叫了unpark再繼續執行下面    LockSupport.park(this);     // 如果被喚醒,檢視自己是不是已經被中斷了。    return Thread.interrupted();}
cancelAcquireacquireQueued方法的finally會判斷 failed值,正常執行時候自旋出來的時候會是false,如果中斷或者timeout了 則會是true,執行cancelAcquire,其中核心程式碼是node.waitStatus = Node.CANCELLED。selfInterrupt
static void selfInterrupt() {      Thread.currentThread().interrupt();  }
4.4.2 獨佔式釋放佇列頭節點

release()會呼叫tryRelease方法嘗試釋放當前執行緒持有的鎖,成功的話喚醒後繼執行緒,並返回true,否則直接返回false。

public final boolean release(long arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}
tryRelease 這個是子類需要自我實現的,沒啥說的根據業務需要實現。unparkSuccessor 喚醒頭結點的後繼節點。
private void unparkSuccessor(Node node) {   int ws = node.waitStatus; // 獲得頭節點狀態    if (ws < 0) //如果頭節點裝小於0 則將其置為0        compareAndSetWaitStatus(node, ws, 0);    Node s = node.next; //這個是新的頭節點    if (s == null || s.waitStatus > 0) {     // 如果新頭節點不滿足要求        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)        //從佇列尾部開始往前去找最前面的一個waitStatus小於0的節點            if (t.waitStatus <= 0)                s = t;    }    if (s != null)//喚醒後繼節點對應的執行緒        LockSupport.unpark(s.thread);}
4.4.3 AQS 中增加跟刪除形象圖5、CountDownLatch底層5.1 共享鎖 CountDownLatch底層

CountDownLatch 雖然相對簡單,但也實現了共享鎖模型。但是如何正確的吹逼 CountDownLatch 呢?如果在理解了上述流程的基礎上,從CountDownLatch入手來看AQS 中關於共享鎖的程式碼還比較好看懂,在看的時候可以 以看懂大致內容為主,學習其設計的思路,不要陷入所有條件處理細節中,多執行緒環境中,對與錯有時候不是那麼容易看出來的。個人追原始碼繪製瞭如下圖:

5.2 計數訊號量Semaphore

Semaphore 這就是共享鎖的一個實現類,在初始化的時候就規定了共享鎖池的大小N,有一個執行緒獲得了鎖,可用數就減少1個。有一個執行緒釋放鎖可用數就增加1個。如果有 >=2 的執行緒同時釋放鎖,則此時有多個鎖可用。這個時候就可以 同時喚醒 兩個鎖setHeadAndPropagate (流程圖懶的繪製了)。

 public final void acquireShared(int arg) {     if (tryAcquireShared(arg) < 0)         doAcquireShared(arg); }
private void doAcquireShared(int arg) {    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            //找先驅結點            final Node p = node.predecessor();            if (p == head) {                 // 嘗試獲取資源                int r = tryAcquireShared(arg);                if (r >= 0) {                    // 設定當前結點為頭結點,然後去喚醒後續結點。注意傳播性 喚醒!                    setHeadAndPropagate(node, r);                    p.next = null; // help GC  釋放頭結點,等待GC                    if (interrupted)                        selfInterrupt();                    failed = false;//獲取到資源                    return;                }            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)//如果最後沒有獲取到資源,則cancel            cancelAcquire(node);    }}
5.3 ReentrantReadWriteLock

在 ReentrantReadWriteLock 類中也是隻有一個32位的int state來表示讀鎖跟寫鎖,如何實現的?

後16位用來儲存獨享的寫鎖個數,第一次獲得就是01,第二次重入就是10了,這樣的方式來儲存。

但是多個執行緒都可以獲得讀鎖,並且每個執行緒可能讀多次,如何儲存?我們用前16位來儲存有多少個執行緒獲得了讀鎖。

每個讀鎖執行緒獲得的重入讀鎖個數 由內部類HoldCounter與讀鎖配套使用。

6、Condition

synchronized 可用 wait()notify()/notifyAll() 方法相結合可以實現等待/通知模式。Lock 也提供了 Condition 來提供類似的功能。

Condition是JDK5後引入的Interface,它用來替代傳統的Object的wait()/notify()實現執行緒間的協作,相比使用Object的wait()/notify(),使用Condition的await()/signal()這種方式 實現執行緒間協作更加安全和高效。簡單說,他的作用是使得某些執行緒一起等待某個條件(Condition),只有當該條件具備(signal 或者 signalAll方法被呼叫)時,這些等待執行緒才會被喚醒,從而重新爭奪鎖。wait()/notify()這些都更傾向於底層的實現開發,而Condition介面更傾向於程式碼實現的等待通知效果。兩者之間的區別與共通點如下:

6.1 條件等待佇列

條件等待佇列,指的是 Condition 內部自己維護的一個佇列,不同於 AQS 的 同步等待佇列。它具有以下特點:

要加入條件等待佇列的節點,不能在 同步等待佇列。

從 條件等待佇列 移除的節點,會進入同步等待佇列。

一個鎖物件只能有一個同步等待佇列,但可以有多個條件等待佇列。

這裡以 AbstractQueuedSynchronizer 的內部類 ConditionObject 為例(Condition 的實現類)來分析下它的具體實現過程。首先來看該類內部定義的幾個成員變數:

/** First node of condition queue. */private transient Node firstWaiter;/** Last node of condition queue. */private transient Node lastWaiter;

它採用了 AQSNode 節點構造(前面說過Node類有nextWaiter屬性),並定義了兩個成員變數:firstWaiterlastWaiter 。說明在 ConditionObject 內部也維護著一個自己的單向等待佇列。目前可知它的結構如下:

6.2 await、signal

比如有執行緒 1、2競爭鎖,下面來說下具體過程 執行緒1:

1、執行緒1 呼叫 reentrantLock.lock時,持有鎖。

2、執行緒1 呼叫 await 方法,進入條件等待佇列 ,同時釋放鎖。

3、執行緒1 獲取到執行緒2 signal 訊號,從條件等待佇列進入同步等待佇列。

執行緒2:

1、執行緒2 呼叫 reentrantLock.lock時,由於鎖被執行緒1 持有,進入同步等待佇列 。

2、由於執行緒1 釋放鎖,執行緒2 從同步等待佇列 移除,獲取到鎖。

3、執行緒2 呼叫 signal 方法,導致執行緒 1 被喚醒。執行緒2 呼叫unlock ,執行緒1 獲取鎖後繼續下走。

6.2.1 await

當我們看await、signal 的原始碼時候不要認為等待佇列跟同步佇列是完全分開的,其實個人感覺底層原始碼是有點 HashMap 中的紅黑樹跟雙向連結串列的意思。

當呼叫await方法時候,說明當前任務佇列的頭節點拿著鎖呢,此時要把該Thread從任務佇列挪到等待佇列再喚醒任務佇列最前面排隊的執行任務,如圖:

thread 表示節點存放的執行緒。waitStatus 表示節點等待狀態。條件等待佇列中的節點等待狀態都是 CONDITION,否則會被清除。nextWaiter 表示後指標。6.2.2 signal

當我們呼叫signal方法的時候,我們要將等待佇列中的頭節點移出來,讓其去搶鎖,如果是公平模式就要去排隊了,流程如圖:

上面只是形象流程圖,如果從程式碼級別看的話大致流程如下:

6.2.3 signalAll

signalAll signal 方法的區別體現在 doSignalAll 方法上,前面我們已經知道doSignal方法只會對等待佇列的頭節點進行操作,doSignalAll方法只不過將等待佇列中的每一個節點都移入到同步佇列中,即通知當前呼叫condition.await()方法的每一個執行緒:

private void doSignalAll(Node first) {    lastWaiter = firstWaiter = null;    do {        Node next = first.nextWaiter;        first.nextWaiter = null;        transferForSignal(first);        first = next;    } while (first != null); // 迴圈}
6.3 End

一個 Condition 物件就有一個單項的等待任務隊列。在一個多執行緒任務中我們可以new出多個等待任務佇列。比如我們new出來兩個等待佇列。

 private Lock lock = new ReentrantLock(); private Condition FirstCond = lock.newCondition(); private Condition SecondCond = lock.newCondition();

所以真正的AQS任務中一般是一個任務佇列N個等待佇列的,因此我們儘量呼叫signal而少用signalAll,因為在指定的例項化等待佇列中只有一個可以拿到鎖的。

Synchronized 中的 waitnotify 底層程式碼的等待佇列只有一個,多個執行緒呼叫wait的時候我們是無法知道頭節點是那個具體執行緒的。因此只能notifyAll

9
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • numpy基礎內容學習筆記