原文連結:https://mp.weixin.qq.com/s/2Ltx9LAbIcWvwk_jvmSRcA
本文重點前情提要
之前文章中寫到了 JDK 中 synchronized 關鍵字可以實現同步鎖,並且詳細分析了底層的實現原理。
Synchronized 原理及鎖升級分析
雖然 synchronized 在效能上不再被人詬病,但是在實際使用中仍然缺乏一定的靈活性。
比如在一些場景中需要去嘗試獲取鎖,如果失敗則不再進行等待,又或者設定一定的等待時間,超時後就放棄等待。
正文開始,ReentrantLock 介紹
java.util.concurrent.locks 包提供了多種鎖機制的實現,本文暫且以 ReentrantLock為例,探究 Java 是如何實現同步鎖的。
ReentrantLock,先看名字,reentrant——可重入的,表示持鎖執行緒可以多次加鎖。
使用案例如下:
ReentrantLock lock = new ReentrantLock(true);lock.lock();try { //do something} finally { lock.unlock();}
AQS
ReentrantLock 底層是由 AQS 框架實現的,並且 java.util.concurrent.locks 提供的多種同步鎖都是 AQS 的子類。
AQS,先看名字,AbstractQueuedSynchronizer,抽象的佇列式同步器。AQS 是 Java 中同步鎖實現的基石,由 Java 之神 Doug Lea 操刀。
AbstractQueuedSynchronizer 的核心思想是提供了一個同步佇列,將未獲取到鎖的執行緒阻塞排隊。
提供的關鍵方法如下,此處用到了模板模式,方法的具體實現由子類來提供。
// 獲取鎖public final void acquire(int arg) {}// 釋放鎖public final boolean release(int arg) {}// 嘗試獲取鎖protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}// 嘗試釋放鎖protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
AQS 中的佇列由 Node 組成的雙端列表實現。
static final class Node { // 標識共享鎖 static final Node SHARED = new Node(); //標識獨佔鎖 static final Node EXCLUSIVE = null; // 已取消 static final int CANCELLED = 1; //後繼節點在等待當前節點喚醒 static final int SIGNAL = -1; //等待在 condition 上 static final int CONDITION = -2; //共享模式下,喚醒後繼節點 static final int PROPAGATE = -3; //狀態,預設 0 volatile int waitStatus; //前驅節點 volatile Node prev; //後繼節點 volatile Node next; volatile Thread thread; Node nextWaiter;}
/** // 頭結點private transient volatile Node head;// 尾結點private transient volatile Node tail;// 同步狀態private volatile int state;
ReetrantLock 原理1. 建立公平鎖
ReentrantLock lock = new ReentrantLock(true) ,引數 true 標識建立的是公平鎖。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
2、執行 lock()
final void lock() { acquire(1);}
final void lock() { acquire(1);}
acquire() 由 AQS 提供,並且不可重寫,原始碼如下。
public final void acquire(int arg) { // 1. 嘗試獲取鎖 if (!tryAcquire(arg) && // 2. 如果未獲取到, 則加入佇列 //addWaiter,建立節點,並設定下一節點為null,設定為尾結點 //acquireQueued,將當前節點加入佇列 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //設定執行緒中斷標誌 selfInterrupt();}
嘗試獲取鎖,tryAccquire() 由子類根據自己的策略實現
如果獲取失敗,則執行 addWaiter(),將當前執行緒封裝成一個 Node, 設定為尾結點
執行 acquireQueued(),將當前節點加入到同步佇列
tryAccquire
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state = 0 代表沒有執行緒佔用 if (c == 0) { // hasQueuedPredecessors 返回是否需要排隊,需要排隊則返回 true if (!hasQueuedPredecessors() && //不需要排隊,則 CAS 設定 state = 1 compareAndSetState(0, acquires)) { // 設定成功,則搶到鎖,設定持鎖執行緒為當前執行緒 setExclusiveOwnerThread(current); return true; } } // 如果被是當前執行緒佔用,則記錄重入次數 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // state = 0 代表沒有執行緒佔用 if (c == 0) { // hasQueuedPredecessors 返回是否需要排隊,需要排隊則返回 true if (!hasQueuedPredecessors() && //不需要排隊,則 CAS 設定 state = 1 compareAndSetState(0, acquires)) { // 設定成功,則搶到鎖,設定持鎖執行緒為當前執行緒 setExclusiveOwnerThread(current); return true; } } // 如果被是當前執行緒佔用,則記錄重入次數 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}
1、公平鎖實現的 tryAcquire,首先獲取同步狀態 state,不為 0 代表有執行緒持鎖,則判斷是否是當前執行緒,如果是則 state + 1, 因此是可重入的。
2、如果 state = 1,則檢查是否需要排隊 hasQueuedPredecessors,如果需要排隊則不繼續判斷,走加入佇列的邏輯。
3、如果不需要排隊,透過 CAS 設定同步狀態 state = 1, 設定失敗則去排隊,設定成功則設定持鎖執行緒為當前執行緒。
addWaiter()
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 將當前節點設定為尾節點 Node pred = tail; if (pred != null) { //設定節點上一節點為尾結點 node.prev = pred; //CAS 操作,將尾結點設定為當前節點 if (compareAndSetTail(pred, node)) { //設定之前尾結點下一節點,指向當前尾結點, 返回尾結點 pred.next = node; return node; } } //如果尾結點是null, 或者 CAS 操作失敗,進行自旋 enq() 加入尾結點 enq(node); return node;}private Node enq(final Node node) { //自旋 for (;;) { Node t = tail; if (t == null) { // Must initialize //尾節點是null,則 CAS 初始化佇列的頭節點 head,頭結點是空節點, tail = head if (compareAndSetHead(new Node())) tail = head; } else { // 設定當前節點前一節點為 tail node.prev = t; // CAS 操作設定當前節點為 尾結點 if (compareAndSetTail(t, node)) { // 設定之前尾結點指向當前節點 t.next = node; return t; } } }}
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 將當前節點設定為尾節點 Node pred = tail; if (pred != null) { //設定節點上一節點為尾結點 node.prev = pred; //CAS 操作,將尾結點設定為當前節點 if (compareAndSetTail(pred, node)) { //設定之前尾結點下一節點,指向當前尾結點, 返回尾結點 pred.next = node; return node; } } //如果尾結點是null, 或者 CAS 操作失敗,進行自旋 enq() 加入尾結點 enq(node); return node;}private Node enq(final Node node) { //自旋 for (;;) { Node t = tail; if (t == null) { // Must initialize //尾節點是null,則 CAS 初始化佇列的頭節點 head,頭結點是空節點, tail = head if (compareAndSetHead(new Node())) tail = head; } else { // 設定當前節點前一節點為 tail node.prev = t; // CAS 操作設定當前節點為 尾結點 if (compareAndSetTail(t, node)) { // 設定之前尾結點指向當前節點 t.next = node; return t; } } }}
1、需要排隊的,先將執行緒封裝成一個 Node,如果尾結點為空,則說明佇列還未初始化,則透過 enq(node) 進行初始化操作。
2、如果尾結點不為空,則透過 CAS 嘗試將當前節點設定為尾結點,返回當前節點。
3、如果 CAS 執行失敗,則透過 enq(node) 進行自旋嘗試加入隊尾。
acquireQueued()
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // p 是前一節點 final Node p = node.predecessor(); // 如果 p 是頭節點, 再掙扎一次嘗試獲取鎖 if (p == head && tryAcquire(arg)) { //獲取成功,設定當前節點為頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; // 返回false, 獲取鎖成功,沒有進入佇列 } // 檢查是否需要執行緒阻塞等待,如果前一個不是待喚醒,則自旋 // 如果前一個待喚醒,則當前執行緒也阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前一個節點等待被喚醒 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { // 前一節點已被取消,跳過,再向前找節點 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 預設值是0,設定為待喚醒 // CAS 設定前一節點等待喚醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
private final boolean parkAndCheckInterrupt() { // 阻塞執行緒 LockSupport.park(this); return Thread.interrupted();}
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // p 是前一節點 final Node p = node.predecessor(); // 如果 p 是頭節點, 再掙扎一次嘗試獲取鎖 if (p == head && tryAcquire(arg)) { //獲取成功,設定當前節點為頭節點 setHead(node); p.next = null; // help GC failed = false; return interrupted; // 返回false, 獲取鎖成功,沒有進入佇列 } // 檢查是否需要執行緒阻塞等待,如果前一個不是待喚醒,則自旋 // 如果前一個待喚醒,則當前執行緒也阻塞等待 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前一個節點等待被喚醒 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { // 前一節點已被取消,跳過,再向前找節點 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 預設值是0,設定為待喚醒 // CAS 設定前一節點等待喚醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
private final boolean parkAndCheckInterrupt() { // 阻塞執行緒 LockSupport.park(this); return Thread.interrupted();}
1、透過 acquireQueued 將當前節點加入隊尾,並設定阻塞。自旋,判斷如果當前節點的前驅節點。是頭節點(head 節點不排隊,只記錄狀態,head 的後驅節點才是真正第一個排隊的),則再次嘗試 tryAcquire() 獲取鎖。
2、可以看到自旋的跳出條件是當前節點是佇列中第一個,並且獲取鎖。
3、如果一直自旋,則會消耗 CPU 資源,因此使用 shouldParkAfterFailedAcquire 判斷是否需要將當前執行緒阻塞,如果是則透過 parkAndCheckInterrupt 阻塞執行緒的執行。
4、LockSupport.park() 是透過 native 方法 UNSAFE.park() 實現的執行緒阻塞。
3、執行 unlock()
釋放鎖的邏輯相對比較簡單。
public void unlock() { sync.release(1);}
AQS 的 release() 方法。
public final boolean release(int arg) { //釋放鎖 if (tryRelease(arg)) { Node h = head; //佇列元素需要喚醒 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}
tryRelease()protected final boolean tryRelease(int releases) { //多次重入需要多次釋放 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //等於0代表釋放,設定鎖持有執行緒為null if (c == 0) { free = true; setExclusiveOwnerThread(null); } //設定狀態為0 setState(c); return free;}
unparkSuccessor()private void unparkSuccessor(Node node) { int ws = node.waitStatus; // CAS 操作設定等待狀態為 0-初始態 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //取下一節點 Node s = node.next; //如果下一節點狀態為取消,則從尾結點開始迴圈,找到最前面未取消的節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // unpark 喚醒執行緒 if (s != null) LockSupport.unpark(s.thread);}
protected final boolean tryRelease(int releases) { //多次重入需要多次釋放 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //等於0代表釋放,設定鎖持有執行緒為null if (c == 0) { free = true; setExclusiveOwnerThread(null); } //設定狀態為0 setState(c); return free;}
private void unparkSuccessor(Node node) { int ws = node.waitStatus; // CAS 操作設定等待狀態為 0-初始態 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //取下一節點 Node s = node.next; //如果下一節點狀態為取消,則從尾結點開始迴圈,找到最前面未取消的節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // unpark 喚醒執行緒 if (s != null) LockSupport.unpark(s.thread);}
1、unlock() 方法呼叫了 AQS 的 release()方法,執行 tryRelease() 釋放鎖。
2、tryRelease() 由子類實現,reentrantLock 釋放邏輯是將 state - 1, 直到 state = 0 才認為是釋放完畢,多次重入需要多次釋放。
3、透過 AQS 的 unparkSuccessor() 喚醒下一個等待的執行緒。
4、使用 LockSupport.unpark(s.thread) 喚醒執行緒。
公平與非公平
實際上公平與非公平的區別如下:
1、公平鎖的流程是,當前有執行緒持鎖,則新來的執行緒乖乖去排隊。
2、非公平鎖的流程是,新來的執行緒先去搶一把試試,沒搶到再去排隊。
非公平鎖的加鎖邏輯如下。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);}
總結 AQS 的核心
AQS 中使用了幾個核心的操作來進行同步鎖的控制。
總結 ReentrantLock 流程
這還沒完,想要徹底搞定每一步的細節,還得去翻看原始碼,細細品味。