首頁>技術>

原文連結:https://mp.weixin.qq.com/s/2Ltx9LAbIcWvwk_jvmSRcA

本文重點

前情提要

之前文章中寫到了 JDK 中 synchronized 關鍵字可以實現同步鎖,並且詳細分析了底層的實現原理。

Synchronized 原理及鎖升級分析

雖然 synchronized 在效能上不再被人詬病,但是在實際使用中仍然缺乏一定的靈活性。

比如在一些場景中需要去嘗試獲取鎖,如果失敗則不再進行等待,又或者設定一定的等待時間,超時後就放棄等待。

正文開始,ReentrantLock 介紹

java.util.concurrent.locks 包提供了多種鎖機制的實現,本文暫且以 ReentrantLock為例,探究 Java 是如何實現同步鎖的。

ReentrantLock,先看名字,reentrant——可重入的,表示持鎖執行緒可以多次加鎖。

使用案例如下:

ReentrantLock lock = new ReentrantLock(true);lock.lock();try {    //do something} finally {    lock.unlock();}

AQS

ReentrantLock 底層是由 AQS 框架實現的,並且 java.util.concurrent.locks 提供的多種同步鎖都是 AQS 的子類。

AQS,先看名字,AbstractQueuedSynchronizer,抽象的佇列式同步器。AQS 是 Java 中同步鎖實現的基石,由 Java 之神 Doug Lea 操刀。

AbstractQueuedSynchronizer 的核心思想是提供了一個同步佇列,將未獲取到鎖的執行緒阻塞排隊。

提供的關鍵方法如下,此處用到了模板模式,方法的具體實現由子類來提供。

// 獲取鎖public final void acquire(int arg) {}// 釋放鎖public final boolean release(int arg) {}// 嘗試獲取鎖protected boolean tryAcquire(int arg) {    throw new UnsupportedOperationException();}// 嘗試釋放鎖protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException();}

AQS 中的佇列由 Node 組成的雙端列表實現。

static final class Node {    // 標識共享鎖    static final Node SHARED = new Node();    //標識獨佔鎖    static final Node EXCLUSIVE = null;    // 已取消    static final int CANCELLED =  1;    //後繼節點在等待當前節點喚醒    static final int SIGNAL    = -1;    //等待在 condition 上    static final int CONDITION = -2;    //共享模式下,喚醒後繼節點    static final int PROPAGATE = -3;    //狀態,預設 0    volatile int waitStatus;    //前驅節點    volatile Node prev;    //後繼節點    volatile Node next;        volatile Thread thread;    Node nextWaiter;}
/** // 頭結點private transient volatile Node head;// 尾結點private transient volatile Node tail;// 同步狀態private volatile int state;

ReetrantLock 原理

1. 建立公平鎖

ReentrantLock lock = new ReentrantLock(true) ,引數 true 標識建立的是公平鎖。

public ReentrantLock(boolean fair) {    sync = fair ? new FairSync() : new NonfairSync();}

2、執行 lock()
final void lock() {    acquire(1);}

acquire() 由 AQS 提供,並且不可重寫,原始碼如下。

public final void acquire(int arg) {        // 1. 嘗試獲取鎖        if (!tryAcquire(arg) &&                // 2. 如果未獲取到, 則加入佇列                //addWaiter,建立節點,並設定下一節點為null,設定為尾結點                //acquireQueued,將當前節點加入佇列                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            //設定執行緒中斷標誌            selfInterrupt();}

嘗試獲取鎖,tryAccquire() 由子類根據自己的策略實現

如果獲取失敗,則執行 addWaiter(),將當前執行緒封裝成一個 Node, 設定為尾結點

執行 acquireQueued(),將當前節點加入到同步佇列

tryAccquire
protected final boolean tryAcquire(int acquires) {    final Thread current = Thread.currentThread();    int c = getState();    // state = 0 代表沒有執行緒佔用    if (c == 0) {        // hasQueuedPredecessors 返回是否需要排隊,需要排隊則返回 true        if (!hasQueuedPredecessors() &&            //不需要排隊,則 CAS 設定 state = 1            compareAndSetState(0, acquires)) {            // 設定成功,則搶到鎖,設定持鎖執行緒為當前執行緒                setExclusiveOwnerThread(current);                return true;        }    }    // 如果被是當前執行緒佔用,則記錄重入次數    else if (current == getExclusiveOwnerThread()) {        int nextc = c + acquires;        if (nextc < 0)            throw new Error("Maximum lock count exceeded");            setState(nextc);            return true;        }    return false;}

1、公平鎖實現的 tryAcquire,首先獲取同步狀態 state,不為 0 代表有執行緒持鎖,則判斷是否是當前執行緒,如果是則 state + 1, 因此是可重入的。

2、如果 state = 1,則檢查是否需要排隊 hasQueuedPredecessors,如果需要排隊則不繼續判斷,走加入佇列的邏輯。

3、如果不需要排隊,透過 CAS 設定同步狀態 state = 1, 設定失敗則去排隊,設定成功則設定持鎖執行緒為當前執行緒。

addWaiter()
private Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    // 將當前節點設定為尾節點    Node pred = tail;    if (pred != null) {        //設定節點上一節點為尾結點        node.prev = pred;        //CAS 操作,將尾結點設定為當前節點        if (compareAndSetTail(pred, node)) {            //設定之前尾結點下一節點,指向當前尾結點, 返回尾結點            pred.next = node;            return node;        }    }    //如果尾結點是null, 或者 CAS 操作失敗,進行自旋 enq() 加入尾結點    enq(node);    return node;}private Node enq(final Node node) {    //自旋    for (;;) {        Node t = tail;        if (t == null) { // Must initialize            //尾節點是null,則 CAS 初始化佇列的頭節點 head,頭結點是空節點, tail = head            if (compareAndSetHead(new Node()))                tail = head;        } else {            // 設定當前節點前一節點為 tail            node.prev = t;            // CAS 操作設定當前節點為 尾結點            if (compareAndSetTail(t, node)) {                // 設定之前尾結點指向當前節點                t.next = node;                return t;            }        }    }}

1、需要排隊的,先將執行緒封裝成一個 Node,如果尾結點為空,則說明佇列還未初始化,則透過 enq(node) 進行初始化操作。

2、如果尾結點不為空,則透過 CAS 嘗試將當前節點設定為尾結點,返回當前節點。

3、如果 CAS 執行失敗,則透過 enq(node) 進行自旋嘗試加入隊尾。

acquireQueued()
final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            // p 是前一節點            final Node p = node.predecessor();            // 如果 p 是頭節點, 再掙扎一次嘗試獲取鎖            if (p == head && tryAcquire(arg)) {                //獲取成功,設定當前節點為頭節點                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted; // 返回false, 獲取鎖成功,沒有進入佇列            }            // 檢查是否需要執行緒阻塞等待,如果前一個不是待喚醒,則自旋            // 如果前一個待喚醒,則當前執行緒也阻塞等待            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    int ws = pred.waitStatus;    // 前一個節點等待被喚醒    if (ws == Node.SIGNAL)        /*         * This node has already set status asking a release         * to signal it, so it can safely park.         */        return true;    if (ws > 0) {        // 前一節點已被取消,跳過,再向前找節點        /*         * Predecessor was cancelled. Skip over predecessors and         * indicate retry.         */        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {        // 預設值是0,設定為待喚醒        // CAS 設定前一節點等待喚醒        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}
private final boolean parkAndCheckInterrupt() {    // 阻塞執行緒    LockSupport.park(this);    return Thread.interrupted();}

1、透過 acquireQueued 將當前節點加入隊尾,並設定阻塞。自旋,判斷如果當前節點的前驅節點。是頭節點(head 節點不排隊,只記錄狀態,head 的後驅節點才是真正第一個排隊的),則再次嘗試 tryAcquire() 獲取鎖。

2、可以看到自旋的跳出條件是當前節點是佇列中第一個,並且獲取鎖。

3、如果一直自旋,則會消耗 CPU 資源,因此使用 shouldParkAfterFailedAcquire 判斷是否需要將當前執行緒阻塞,如果是則透過 parkAndCheckInterrupt 阻塞執行緒的執行。

4、LockSupport.park() 是透過 native 方法 UNSAFE.park() 實現的執行緒阻塞。

3、執行 unlock()

釋放鎖的邏輯相對比較簡單。

public void unlock() {    sync.release(1);}

AQS 的 release() 方法。

public final boolean release(int arg) {    //釋放鎖    if (tryRelease(arg)) {        Node h = head;        //佇列元素需要喚醒        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

tryRelease()
protected final boolean tryRelease(int releases) {        //多次重入需要多次釋放        int c = getState() - releases;        if (Thread.currentThread() != getExclusiveOwnerThread())            throw new IllegalMonitorStateException();        boolean free = false;        //等於0代表釋放,設定鎖持有執行緒為null        if (c == 0) {            free = true;            setExclusiveOwnerThread(null);        }        //設定狀態為0        setState(c);        return free;}

unparkSuccessor()
private void unparkSuccessor(Node node) {    int ws = node.waitStatus;    // CAS 操作設定等待狀態為 0-初始態    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    //取下一節點    Node s = node.next;    //如果下一節點狀態為取消,則從尾結點開始迴圈,找到最前面未取消的節點    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    // unpark 喚醒執行緒    if (s != null)        LockSupport.unpark(s.thread);}

1、unlock() 方法呼叫了 AQS 的 release()方法,執行 tryRelease() 釋放鎖。

2、tryRelease() 由子類實現,reentrantLock 釋放邏輯是將 state - 1, 直到 state = 0 才認為是釋放完畢,多次重入需要多次釋放。

3、透過 AQS 的 unparkSuccessor() 喚醒下一個等待的執行緒。

4、使用 LockSupport.unpark(s.thread) 喚醒執行緒。

公平與非公平

實際上公平與非公平的區別如下:

1、公平鎖的流程是,當前有執行緒持鎖,則新來的執行緒乖乖去排隊。

2、非公平鎖的流程是,新來的執行緒先去搶一把試試,沒搶到再去排隊。

非公平鎖的加鎖邏輯如下。

final void lock() {    if (compareAndSetState(0, 1))        setExclusiveOwnerThread(Thread.currentThread());    else        acquire(1);}

總結 AQS 的核心

AQS 中使用了幾個核心的操作來進行同步鎖的控制。

總結 ReentrantLock 流程

這還沒完,想要徹底搞定每一步的細節,還得去翻看原始碼,細細品味。

8
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • SSM 三大框架系列:Spring 5 + Spring MVC 5 + MyBatis 3.5 整合(附原始碼)