首頁>技術>

目錄同步器簡介方法簡介同步器實現原理自己動手實現獨佔鎖同步器原始碼分析同步器簡介

AQS 可以認為是一個模板方法,是 JDK 提供的實現 JVM 鎖的模板,AQS 封裝了同步機制,透過實現 AQS 可以讓開發者比較簡單就可以實現 JVM 鎖,而不用去考慮底層的同步機制。降低了鎖的實現難度及實現代價。

AQS 同步器提供了兩套同步方案:獨佔式、共享式。也就是說要實現獨佔鎖或者共享鎖都可以透過繼承 AQS 實現。

方法簡介

可以看出 AQS 同步器為兩套方案分別提供了兩套方法,不帶 Shared 的方法是獨佔式的,相應帶 Shared 的方法就是共享式的。AQS 同步器提供了三種不同的加鎖方式,用於適用不同的業務場景。

實現原理同步狀態

AQS 提供了一個同步狀態屬性 state,透過執行緒同步地設定同步狀態實現加鎖邏輯,讓一個屬性實現執行緒同步很簡單,兩個步驟:

使用 volatile 修飾,保證執行緒可見性使用 Unsafe 類的 cas 方法修改屬性值

volatile + cas 可以保證執行緒安全,當有多個執行緒需要修改 state 屬性值時,只會有一個執行緒會操作成功,這就可以在JVM層面實現加鎖操作。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    // 同步狀態    private volatile int state;    // 獲取同步狀態    protected final int getState() {        return state;    }    // 設定同步狀態    protected final void setState(int newState) {        state = newState;    }    // cas 設定同步狀態    protected final boolean compareAndSetState(int expect, int update) {        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);    }}

同步狀態 state 是個 int 型別的數值,透過對數值的控制可以實現不同型別的鎖。比如 ReentrantLock 就是將同步狀態從 0 修改為 1 表示加鎖來實現獨佔鎖,而 Semaphore 是為同步狀態設定一個初始值,同步狀態大於 0 就可以加鎖成功,然後同步狀態減 1,這樣實現多個執行緒同時加鎖的共享鎖。

多個執行緒同時加鎖,但是隻有一個執行緒會成功,那麼加鎖失敗的執行緒怎麼處理?AQS 提供了一套完整的基於 CLH 佇列的解決方案,加鎖失敗後執行緒會進入 CLH 佇列進行阻塞等待,AQS 的加鎖流程如下圖所示

開始嘗試加鎖嘗試成功?結束進入CLH佇列yesno

CLH 鎖

CLH(Craig, Landin, and Hagersten locks): 是一種基於連結串列的可擴充套件、高效能、公平的自旋鎖,能確保無飢餓性,提供先來先服務的公平性。

透過 CLH 的定義可以捕捉到幾個重要的點

基於連結串列自旋先進先服務

AQS 使用的是一個 FIFO 的雙向連結串列來實現 CLH 佇列,可以看到 AQS 中定義了一個 Node 類作為 CLH 佇列中的節點,Node 類中定義了 prev 指向前一個節點,next 指向後一個節點,AQS 中定義了 head 及 tail 分別表示 CLH 的頭節點和尾節點,透過這個結構就可以從頭節點或者尾節點遍歷整個 CLH 佇列。

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {    // 頭節點    private transient volatile Node head;    // 尾節點    private transient volatile Node tail;    // 節點定義        static final class Node {        static final Node SHARED = new Node();        static final Node EXCLUSIVE = null;        static final int CANCELLED =  1;        static final int SIGNAL    = -1;        static final int CONDITION = -2;        static final int PROPAGATE = -3;        // 等待狀態        volatile int waitStatus;        // 前驅節點        volatile Node prev;        // 後繼節點        volatile Node next;        // 執行緒        volatile Thread thread;        // 標記用        Node nextWaiter;    }}

CLH 佇列處理流程

原始碼中提供了一個簡易的圖形表示,執行緒加鎖失敗後,進入 CLH 佇列,成為佇列的尾節點,優先頭節點獲取鎖,然後按連結串列順序依次獲取鎖。

          +------+    prev  +-----+           +-----+ head  |          |   <----    |        |   <----  |         |   tail          +------+             +-----+           +-----+

按 CLH 定義除頭節點外,其它節點應該自旋等待,頭節點釋放鎖後,則其後續節點就可以立刻獲取鎖。但是自旋是有代價的,AQS 使用 阻塞-喚醒 來替代自旋,其實還是自旋,只是自旋過程中會掛起執行緒,等待執行緒被喚醒後繼續自旋。

獨佔鎖與共享鎖

AQS 提供了獨佔與共享兩種模式,這兩種模式的處理方案的主要區別為

加鎖,獨佔鎖只有一個執行緒能加鎖成功,而共享鎖可以多個。釋放鎖,都需要喚醒 CLH 中的執行緒重新參與加鎖,獨佔鎖只需要喚醒一個節點即可,但共享鎖需要喚醒多個。

接下來演示一下獨佔模式 AQS 實際的處理流程

第一步:CLH 佇列原始狀態,只有頭節點擁有同步狀態(標記綠色),其它節點都處於阻塞狀態(標記紅色)

第二步:當前執行緒嘗試加鎖失敗,進入 CLH 佇列尾部(標記黃色),排隊等待獲取同步狀態

第三步:CLH 佇列頭節點(N0)釋放同步狀態,同時喚醒其後繼節點(N1),N1 嘗試加鎖,如果成功,則將自己設定為頭節點,N0 被踢出佇列

重複第三步,直到當前執行緒節點獲取到同步狀態

公平鎖與非公平鎖

AQS 並沒有提供公平鎖與非公平鎖的實現,但是基於 AQS 提供的方法可以實現公平鎖與非公平鎖,比如 ReentrantLock 就可以在建立的時候選擇公平鎖還是非公平鎖。

所謂公平與非公平指的是嘗試加鎖的那個瞬間,當執行緒釋放鎖時,會喚醒 CLH 佇列中的節點進行加鎖,如果此時還有其它執行緒也需要加鎖,那麼就會存在競爭關係,公平鎖的處理方案就是先來後到,非公平鎖處理方案就是競爭,如果競爭不成功就老實排隊。

自己動手實現獨佔鎖

透過閱讀上文的實現原理,可以知道 AQS 已經實現了 CLH 佇列,實現一個鎖只需要實現加鎖及釋放鎖的邏輯即可,這個步驟可以透過對同步狀態的設定進行實現。

實現思路:同步狀態初始值為 0,加鎖,將同步狀態設定為 1,釋放鎖,將同步狀態重置為 0,如此就可以實現一個最簡單的獨佔鎖。

// 實現 JDK 提供的鎖介面public class MutexLock implements Lock {    // 實現 AQS,實現自定義的加鎖及釋放鎖邏輯    private static class Sync extends AbstractQueuedSynchronizer {        // 加鎖,cas 設定同步狀態為 1        @Override         protected boolean tryAcquire(int arg) {            return compareAndSetState(0, arg);        }        // 解鎖,cas 設定同步狀態為 0        @Override        protected boolean tryRelease(int arg) {            if (getState() == 0) {                throw new IllegalMonitorStateException();            }            setState(0);            return true;        }        Condition newCondition() {            return new ConditionObject();        }    }    // 鎖的實現委託給 AQS 的實現類    private final Sync sync = new Sync();    public void lock() { sync.acquire(1); }    public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }    public boolean tryLock() { return sync.tryAcquire(1); }    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); }    public void unlock() { sync.release(0); }    public Condition newCondition() { return sync.newCondition(); }}
原始碼分析獨佔鎖原始碼分析

首先看加鎖方法 acquire,程式碼如下

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

這部分程式碼寫的十分精簡,擴充套件開來會比較清晰,這裡涉及到幾個重要的方法:

tryAcquire:嘗試獲取獨佔鎖,需要實現類自行實現addWaiter:將當前執行緒封裝成 Node 節點,新增至 CLH 等待佇列尾部acquireQueued:在 CLH 佇列中透過自旋的方式進行鎖的獲取selfInterrupt:呼叫 Thread.currentThread().interrupt() 方法為當前執行緒設定箇中斷標記
public final void acquire(int arg) {    // 嘗試獲取獨佔鎖,如果成功立即返回    boolean flag = tryAcquire(arg);    // 如果加鎖不成功    if (!flag) {        // 將當前執行緒加入等待佇列尾部        Node waiter = addWaiter(Node.EXCLUSIVE);        // 透過自旋方式獲取獨佔鎖        boolean interrupted = acquireQueued(waiter, arg);        if (interrupted) {            selfInterrupt();        }    }}

tryAcquire 方法需要實現類自行實現加鎖邏輯,透過操作同步狀態就可以實現加鎖

protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}

addWaiter 方法完成兩件事件:

將當前執行緒封裝為 Node 物件將 Node 物件新增至等待佇列尾部,為保證執行緒安全性,替換佇列尾節點的操作透過 cas 完成。
private Node addWaiter(Node mode) {    // 將當前執行緒封裝成 Node 物件    Node node = new Node(Thread.currentThread(), mode);    // 快速嘗試一次將 Node 物件新增至等待佇列尾部,如果嘗試失敗則呼叫 enq 方法    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    // 將 Node 物件新增至等待佇列尾部    enq(node);    return node;}

enq 方法完成兩件事件:

如果 CLH 佇列為空,則初始化佇列,初始化時會在佇列頭設定一個空節點,透過這種方式保證喚醒操作,永遠都是喚醒第二個節點。如果 CLH 佇列不為這,則將當前執行緒節點新增至等待隊尾。為保證執行緒安全性,替換佇列頭節點及尾節點的操作都是透過 cas 完成。

addWaiter 方法中快速嘗試替換尾節點的操作跟 enq 方法中的程式碼其實是一模一樣的,所以本人沒看出來那部分程式碼有什麼用,並不存在效率上的提升,這只是個人愚見,有不同意見可以探討。

private Node enq(final Node node) {    for (;;) {        Node t = tail;        if (t == null) {            // 如果尾部不存在,初始化等待佇列,將佇列頭及尾設定為空 Node            if (compareAndSetHead(new Node()))                tail = head;        } else {            node.prev = t;            // 透過 cas 執行緒安全地將當前 Node 替換原來的尾節點            if (compareAndSetTail(t, node)) {                t.next = node;                return t;            }        }    }}

此至,當前執行緒的 Node 物件已新增至等待佇列尾部,acquireQueued 方法則是在佇列中透過自旋的方式獲取鎖,AQS 中的自旋,並不是無限制地自旋,而是透過 掛起-喚醒 機制消除自旋可能引起的 CPU 空耗。

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        // 自旋        for (;;) {            // 獲取當前節點的前驅節點            final Node p = node.predecessor();            // 如果前驅節點是頭節點,則嘗試獲取鎖            if (p == head && tryAcquire(arg)) {                // 加鎖成功,則將當前節點設定為頭節點                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            // 加鎖失敗,根據節點狀態決定是否需要掛起執行緒            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        // 如果出現異常,則取消加鎖,並進行出隊操作        if (failed)            cancelAcquire(node);    }}

shouldParkAfterFailedAcquire 方法根據前驅節點的狀態判斷是否需要掛起當前執行緒節點

如果前驅節點的狀態為 SIGNAL,表示該節點釋放鎖時會喚醒其後繼節點,所以可以安心掛起當前執行緒如果前驅節點的狀態大於 0,即為 CANCELLED 已取消狀態,則將當前節點前驅的所有已取消狀態的節點都從佇列中踢出如果前驅節點的狀態小於等於 0,即為 CONDITION 或者 PROPAGATE,分別表示在等待條件或者是共享狀態,則將前驅節點的狀態透過 cas 設定為 SIGNAL,不需要掛起

可以看出,只有前驅節點為 SIGNAL 狀態時,後繼節點才可以掛起,因為 SIGNAL 狀態的節點在釋放鎖時會喚醒其後繼節點,而前驅節點處於其它狀態時,當前執行緒不會掛起,而是將前驅節點的狀態修改為 SIGNAL,繼續自旋,如果再次獲取鎖失敗,此時前驅節點的狀態就是 SIGNAL 了,然後就可以掛起了。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    // 前驅節點的狀態    int ws = pred.waitStatus;    // 如果前驅節點的狀態為 SIGNAL    if (ws == Node.SIGNAL)        return true;    if (ws > 0) { // 前驅節點處於取消狀態,則需要將前驅節點從佇列中刪除        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else { // 0、`CONDITION` 或者 `PROPAGATE` 狀態,設定為 SIGNAL        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}

parkAndCheckInterrupt 方法用於阻塞當前執行緒

private final boolean parkAndCheckInterrupt() {    // 阻塞當前執行緒,進入等待狀態,需要呼叫 LockSupport.unpark 方法或者 interrupt 中斷進行喚醒    LockSupport.park(this);    // 被喚醒之後返回當前執行緒是否在中斷狀態,並清除中斷記號    return Thread.interrupted();}

至此,整個加鎖過程就已經完成。

最後看一下解鎖方法 tryRelease,程式碼如下

public final boolean release(int arg) {    // 執行釋放鎖,需要實現類自行實現邏輯    if (tryRelease(arg)) {        Node h = head;        // 喚醒 CLH 佇列的頭節點        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

unparkSuccessor 方法用於喚醒 CLH 頭節點的後繼節點(前文中一直說喚醒頭節點,其實真正喚醒的是頭節點的後繼節點),傳入的 node 就是頭節點。如果後繼節點狀態為已取消,則向後查詢最近的一個有效節點進行喚醒。

private void unparkSuccessor(Node node) {    int ws = node.waitStatus;    // 將頭節點的狀態重置為 0    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    // 如果後續節點為已取消,則從佇列尾部開始向前查詢,找到離當前節點最近的一個有效節點        Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    // 喚醒該節點中的執行緒    if (s != null)        LockSupport.unpark(s.thread);}

最後理一下獨佔模式下 CLH 節點的狀態變化過程,如下圖所示,其中後繼節點加入與加鎖是同步進行,只要在釋放鎖之前有後繼節點加入,則狀態會被修改為 SIGNAL,這樣後繼節點才可以掛起執行緒,等待前驅節點釋放鎖之後會主動喚醒後繼節點。

共享鎖原始碼分析

共享鎖與獨佔鎖的實現邏輯大體一致,也是利用 CLH 佇列,CLH 佇列也是透過 掛機-喚醒 機制自旋式獲取鎖,但是共享鎖與獨佔鎖不同的是,共享鎖可以允許多個執行緒同時成功加鎖,所以在加鎖的實現邏輯上有差別,另外,執行緒釋放鎖時,其它執行緒就可以進行加鎖,這裡也不能像獨佔鎖一樣,只喚醒後繼節點即可,因為可能多個節點都可以成功加鎖,只喚醒一個就失去了共享鎖的意義。

首先檢視一下加鎖方法 acquireShared,該方法首先呼叫 tryAcquireShared 方法嘗試加鎖,這裡與獨佔鎖的加鎖方法 tryAcquire 有所不同,獨佔鎖只有一個執行緒可以成功,所以返回布林值,要麼成功,要麼失敗。tryAcquireShared 方法返回一個數值,大於等於 0,表示加鎖成功,小於 0,表示加鎖失敗,走 CLH 佇列流程。

public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}

tryAcquireShared 方法與 tryAcquire 方法一樣需要實現類自行實現

protected int tryAcquireShared(int arg) {    throw new UnsupportedOperationException();}

doAcquireShared 方法相當於獨佔鎖的 addWaiter + acquireQueued + selfInterrupt,獨佔鎖寫的比較藝術,而這裡就是簡單的程式碼陳列。

private void doAcquireShared(int arg) {    // 設定節點為共享節點,加入 CLH 佇列尾部    final Node node = addWaiter(Node.SHARED);     boolean failed = true;    try {        boolean interrupted = false;        for (;;) { // 自旋            final Node p = node.predecessor(); // 獲取前驅節點            if (p == head) {                int r = tryAcquireShared(arg); // 嘗試加鎖                if (r >= 0) { // 加鎖成功                    setHeadAndPropagate(node, r);// 設定當前節點為頭節點                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt(); // 設定中斷標記                    failed = false;                    return;                }            }            // 加鎖失敗,阻塞執行緒,這裡與獨佔鎖的實現一模一樣            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

可以對比一下獨佔鎖的程式碼實現,可以發現實現邏輯幾乎一樣。這裡只說明一下不一樣的地方。

建立節點,呼叫 addWaiter(Node.SHARED) 建立共享節點,而獨佔鎖呼叫 addWaiter(Node.EXCLUSIVE) 建立獨佔節點。加鎖,呼叫 tryAcquireShared,而獨佔鎖呼叫 tryAcquire 方法,這兩個方法都需要實現類自行實現。加鎖成功,呼叫 setHeadAndPropagate(node, r) 方法設定當前節點為頭節點,而獨佔鎖呼叫 setHead(node)

共享鎖的加鎖與獨佔鎖不一樣,雖然都是利用同步狀態做文章,共享鎖可以理解為資源,每一次加鎖,就是向 AQS 申請資源,而釋放鎖就是向 AQS 歸還資源,所以只要 AQS 還有資源就可以允許加鎖。所以 tryAcquireShared 方法的返回值,可以理解為加鎖後的剩餘資源,所以只要大於等於 0,就是加鎖成功。

這裡重點關注一下 setHeadAndPropagate 這個方法,透過方法名可以猜測,setHeadAndPropagate 包含了 setHead 的邏輯,同時還要實現 propagate (傳遞),所謂傳遞就是指,只要 AQS 還有剩餘資源,就繼續喚醒後繼節點,進行加鎖操作。喚醒操作呼叫 doReleaseShared() 方法實現,這個方法也是釋放同步鎖時會呼叫的方法,在下文還會詳細介紹,這裡只要理解為繼續喚醒後繼節點即可。

// propagate 就是 tryAcquireShared 加鎖後返回的剩餘資源private void setHeadAndPropagate(Node node, int propagate) {    Node h = head;    setHead(node); // 將當前節點設定為頭節點    // propagate > 0 表示只要資源還有,則需要繼續喚醒後續節點    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {        Node s = node.next;        // 後繼節點應該也是共享節點        if (s == null || s.isShared())            doReleaseShared(); // 釋放共享鎖時呼叫的方法    }}

接著看一下共享鎖釋放方法 releaseShared,首先呼叫 tryReleaseShared 進行資源歸還,這個方法需要實現類自行實現,然後呼叫 doReleaseShared 方法喚醒 CLH 佇列中的節點。

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

doReleaseShared 這個方法對比獨佔鎖釋放時的操作就要複雜很多,因為獨佔鎖只有一個執行緒可以獲取鎖,所以釋放的時候不存線上程安全的問題,但是這個方法在佇列中加鎖成功與任意執行緒釋放鎖都會呼叫,就有可能出現判斷過程中出現頭節點被替換的問題。

private void doReleaseShared() {    for (;;) {        Node h = head;        if (h != null && h != tail) {            int ws = h.waitStatus;            // 如果節點狀態為 -1,則將狀態設定為 0,喚醒後繼節點            if (ws == Node.SIGNAL) {                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;            // loop to recheck cases                unparkSuccessor(h);            // 如果節點狀態為 0,則將狀態設定為 -3            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }        if (h == head)                   // loop if head changed            break;    }}

最後理一下共享模式下 CLH 節點的狀態變化過程,如下圖所示。

14
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 還不懂Docker?一個故事安排的明明白白