首頁>技術>

針對讀多寫少的場景,Java提供了另外一個實現Lock介面的讀寫鎖ReentrantReadWriteLock(RRW),之前分析過ReentrantLock是一個獨佔鎖,同一時間只允許一個執行緒訪問。

而 RRW 允許多個讀執行緒同時訪問,但不允許寫執行緒和讀執行緒、寫執行緒和寫執行緒同時訪問。

讀寫鎖內部維護了兩個鎖,一個是用於讀操作的ReadLock,一個是用於寫操作的 WriteLock。

讀寫鎖遵守以下三條基本原則

允許多個執行緒同時讀共享變數;只允許一個執行緒寫共享變數;如果一個寫執行緒正在執行寫操作,此時禁止讀執行緒讀共享變數。讀寫鎖如何實現

RRW也是基於AQS實現的,它的自定義同步器(繼承自AQS)需要在同步狀態state上維護多個讀執行緒和一個寫執行緒的狀態。RRW的做法是使用高低位來實現一個整形控制兩種狀態,一個int佔4個位元組,一個位元組8位。所以高16位表示讀,低16位表示寫。

abstract static class Sync extends AbstractQueuedSynchronizer {  static final int SHARED_SHIFT   = 16;  // 10000000000000000(65536)  static final int SHARED_UNIT    = (1 << SHARED_SHIFT);  // 65535  static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;  //1111111111111111  static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;  // 讀鎖(共享鎖)的數量,只計算高16位的值  static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }  // 寫鎖(獨佔鎖)的數量  static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
獲取讀鎖

當執行緒獲取讀鎖時,首先判斷同步狀態低16位,如果存在寫鎖,則獲取鎖失敗,進入CLH佇列阻塞,反之,判斷當前執行緒是否應該被阻塞,如果不應該阻塞則嘗試 CAS 同步狀態,獲取成功更新同步鎖為讀狀態。

protected final int tryAcquireShared(int unused) {             Thread current = Thread.currentThread();  int c = getState();  // 如果當前已經有寫鎖了,則獲取失敗  if (exclusiveCount(c) != 0 &&      getExclusiveOwnerThread() != current)      return -1;  // 獲取讀鎖數量  int r = sharedCount(c);  // 非公平鎖實現中readerShouldBlock()返回true表示CLH佇列中有正在排隊的寫鎖  // CAS設定讀鎖的狀態值  if (!readerShouldBlock() &&      r < MAX_COUNT &&      compareAndSetState(c, c + SHARED_UNIT)) {      // 省略記錄獲取readLock次數的程式碼      return 1;  }  // 針對上面失敗的條件進行再次處理  return fullTryAcquireShared(current);}final int fullTryAcquireShared(Thread current) {    // 無線迴圈  for (;;) {    int c = getState();    if (exclusiveCount(c) != 0) {      // 如果不是當前執行緒持有寫鎖,則進入CLH佇列阻塞      if (getExclusiveOwnerThread() != current)        return -1;    }     // 如果reader應該被阻塞    else if (readerShouldBlock()) {        // Make sure we're not acquiring read lock reentrantly        if (firstReader == current) {            // assert firstReaderHoldCount > 0;        } else {            if (rh == null) {                rh = cachedHoldCounter;                if (rh == null || rh.tid != getThreadId(current)) {                    rh = readHolds.get();                    if (rh.count == 0)                        readHolds.remove();                }            }            // 當前執行緒沒有持有讀鎖,即不存在鎖重入情況。則進入CLH佇列阻塞            if (rh.count == 0)                return -1;        }    }    // 共享鎖的如果超出了限制    if (sharedCount(c) == MAX_COUNT)        throw new Error("Maximum lock count exceeded");    // CAS設定狀態值    if (compareAndSetState(c, c + SHARED_UNIT)) {            // 省略記錄readLock次數的程式碼      return 1;    }  }}

SHARED_UNIT 的值是65536,也就是說,當第一次獲取讀鎖的後,state的值就變成了65536。

在公平鎖的實現中當CLH佇列中有排隊的執行緒, readerShouldBlock() 方法就會返回為true。非公平鎖的實現中則是當CLH佇列中存在等待獲取寫鎖的執行緒就返回true

還需要注意的是獲取讀鎖的時候,如果當前執行緒已經持有寫鎖,是仍然能獲取讀鎖成功的。後面會提到鎖的降級,如果你對那裡的程式碼有疑問,可以在回過頭來看看這裡申請鎖的程式碼

釋放讀鎖
protected final boolean tryReleaseShared(int unused) {             for (;;) {    int c = getState();    // 減去65536    int nextc = c - SHARED_UNIT;    // 只有當state的值變成0才會真正的釋放鎖    if (compareAndSetState(c, nextc))        return nextc == 0;}}

釋放鎖時,state的值需要減去65536,因為當第一次獲取讀鎖後,state值變成了65536。

任何一個執行緒釋放讀鎖的時候只有在 state==0 的時候才真正釋放了鎖,比如有100個執行緒獲取了讀鎖,只有最後一個執行緒執行 tryReleaseShared 方法時才真正釋放了鎖,此時會喚醒CLH佇列中的排隊執行緒。

獲取寫鎖

一個執行緒嘗試獲取寫鎖時,會先判斷同步狀態 state 是否為0。如果 state 等於 0,說明暫時沒有其它執行緒獲取鎖;如果 state 不等於 0,則說明有其它執行緒獲取了鎖。

此時再判斷state的低16位(w)是否為0,如果w為0,表示其他執行緒獲取了讀鎖,此時進入CLH佇列進行阻塞等待。

如果w不為0,則說明其他執行緒獲取了寫鎖,此時需要判斷獲取了寫鎖的是不是當前執行緒,如果不是則進入CLH佇列進行阻塞等待,如果獲取了寫鎖的是當前執行緒,則判斷當前執行緒獲取寫鎖是否超過了最大次數,若超過,丟擲異常。反之則更新同步狀態。

// 獲取寫鎖protected final boolean tryAcquire(int acquires) {             Thread current = Thread.currentThread();  int c = getState();  int w = exclusiveCount(c);  // 判斷state是否為0  if (c != 0) {      // 獲取鎖失敗      if (w == 0 || current != getExclusiveOwnerThread())          return false;      // 判斷當前執行緒獲取寫鎖是否超出了最大次數65535      if (w + exclusiveCount(acquires) > MAX_COUNT)          throw new Error("Maximum lock count exceeded");            // 鎖重入      setState(c + acquires);      return true;  }  // 非公平鎖實現中writerShouldBlock()永遠返回為false  // CAS修改state的值  if (writerShouldBlock() ||      !compareAndSetState(c, c + acquires))      return false;  // CAS成功後,設定當前執行緒為擁有獨佔鎖的執行緒  setExclusiveOwnerThread(current);  return true;}

在公平鎖的實現中當CLH佇列中存在排隊的執行緒,那麼 writerShouldBlock() 方法就會返回為true,此時獲取寫鎖的執行緒就會被阻塞。

釋放寫鎖

釋放寫鎖的邏輯比較簡單

protected final boolean tryRelease(int releases) {  // 寫鎖是否被當前執行緒持有  if (!isHeldExclusively())      throw new IllegalMonitorStateException();    int nextc = getState() - releases;  boolean free = exclusiveCount(nextc) == 0;  // 沒有其他執行緒持有寫鎖  if (free)      setExclusiveOwnerThread(null);  setState(nextc);  return free;}
鎖的升級?
// 準備讀快取readLock.lock();try {  v = map.get(key);  if(v == null) {    writeLock.lock();    try {      if(map.get(key) != null) {        return map.get(key);      }      // 更新快取程式碼,省略    } finally {      writeLock.unlock();    }  }} finally {  readLock.unlock();}

對於上面獲取快取資料(這也是RRW的應用場景)的程式碼,先是獲取讀鎖,然後再升級為寫鎖,這樣的行為叫做鎖的升級。可惜RRW不支援,這樣會導致寫鎖永久等待,最終導致執行緒被永久阻塞。所以 鎖的升級是不允許的

鎖的降級

雖然鎖的升級不允許,但是鎖的降級卻是可以的。

ReentrantReadWriteLock lock = new ReentrantReadWriteLock();ReadLock readLock = lock.readLock();WriteLock writeLock = lock.writeLock();Map<String, String> dataMap = new HashMap();public void processCacheData() {  readLock.lock();  if(!cacheValid()) {    // 釋放讀鎖,因為不允許    readLock.unlock();    writeLock.lock();    try {      if(!cacheValid()) {          dataMap.put("key", "think123");      }      // 降級為讀鎖      readLock.lock();    } finally {        writeLock.unlock();    }  }  try {    // 仍然持有讀鎖    System.out.println(dataMap);  } finally {      readLock.unlock();  }}public boolean cacheValid() {    return !dataMap.isEmpty();}
RRW需要注意的問題在讀取很多、寫入很少的情況下,RRW 會使寫入執行緒遭遇飢餓(Starvation)問題,也就是說寫入執行緒會因遲遲無法競爭到鎖而一直處於等待狀態。寫鎖支援條件變數,讀鎖不支援。讀鎖呼叫newCondition() 會丟擲UnsupportedOperationException 異常

來源:https://www.tuicool.com/articles/NFn2IrN

26
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 萬張圖片,流暢體驗 - 記一次 Vue 列表渲染的效能最佳化