針對讀多寫少的場景,Java提供了另外一個實現Lock介面的讀寫鎖ReentrantReadWriteLock(RRW),之前分析過ReentrantLock是一個獨佔鎖,同一時間只允許一個執行緒訪問。
而 RRW 允許多個讀執行緒同時訪問,但不允許寫執行緒和讀執行緒、寫執行緒和寫執行緒同時訪問。
讀寫鎖內部維護了兩個鎖,一個是用於讀操作的ReadLock,一個是用於寫操作的 WriteLock。
讀寫鎖遵守以下三條基本原則
允許多個執行緒同時讀共享變數;只允許一個執行緒寫共享變數;如果一個寫執行緒正在執行寫操作,此時禁止讀執行緒讀共享變數。讀寫鎖如何實現RRW也是基於AQS實現的,它的自定義同步器(繼承自AQS)需要在同步狀態state上維護多個讀執行緒和一個寫執行緒的狀態。RRW的做法是使用高低位來實現一個整形控制兩種狀態,一個int佔4個位元組,一個位元組8位。所以高16位表示讀,低16位表示寫。
abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16; // 10000000000000000(65536) static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 65535 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; //1111111111111111 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 讀鎖(共享鎖)的數量,只計算高16位的值 static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 寫鎖(獨佔鎖)的數量 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } }
獲取讀鎖
當執行緒獲取讀鎖時,首先判斷同步狀態低16位,如果存在寫鎖,則獲取鎖失敗,進入CLH佇列阻塞,反之,判斷當前執行緒是否應該被阻塞,如果不應該阻塞則嘗試 CAS 同步狀態,獲取成功更新同步鎖為讀狀態。
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); // 如果當前已經有寫鎖了,則獲取失敗 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 獲取讀鎖數量 int r = sharedCount(c); // 非公平鎖實現中readerShouldBlock()返回true表示CLH佇列中有正在排隊的寫鎖 // CAS設定讀鎖的狀態值 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { // 省略記錄獲取readLock次數的程式碼 return 1; } // 針對上面失敗的條件進行再次處理 return fullTryAcquireShared(current);}final int fullTryAcquireShared(Thread current) { // 無線迴圈 for (;;) { int c = getState(); if (exclusiveCount(c) != 0) { // 如果不是當前執行緒持有寫鎖,則進入CLH佇列阻塞 if (getExclusiveOwnerThread() != current) return -1; } // 如果reader應該被阻塞 else if (readerShouldBlock()) { // Make sure we're not acquiring read lock reentrantly if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 當前執行緒沒有持有讀鎖,即不存在鎖重入情況。則進入CLH佇列阻塞 if (rh.count == 0) return -1; } } // 共享鎖的如果超出了限制 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS設定狀態值 if (compareAndSetState(c, c + SHARED_UNIT)) { // 省略記錄readLock次數的程式碼 return 1; } }}
SHARED_UNIT 的值是65536,也就是說,當第一次獲取讀鎖的後,state的值就變成了65536。
在公平鎖的實現中當CLH佇列中有排隊的執行緒, readerShouldBlock() 方法就會返回為true。非公平鎖的實現中則是當CLH佇列中存在等待獲取寫鎖的執行緒就返回true
還需要注意的是獲取讀鎖的時候,如果當前執行緒已經持有寫鎖,是仍然能獲取讀鎖成功的。後面會提到鎖的降級,如果你對那裡的程式碼有疑問,可以在回過頭來看看這裡申請鎖的程式碼
釋放讀鎖protected final boolean tryReleaseShared(int unused) { for (;;) { int c = getState(); // 減去65536 int nextc = c - SHARED_UNIT; // 只有當state的值變成0才會真正的釋放鎖 if (compareAndSetState(c, nextc)) return nextc == 0;}}
釋放鎖時,state的值需要減去65536,因為當第一次獲取讀鎖後,state值變成了65536。
任何一個執行緒釋放讀鎖的時候只有在 state==0 的時候才真正釋放了鎖,比如有100個執行緒獲取了讀鎖,只有最後一個執行緒執行 tryReleaseShared 方法時才真正釋放了鎖,此時會喚醒CLH佇列中的排隊執行緒。
獲取寫鎖一個執行緒嘗試獲取寫鎖時,會先判斷同步狀態 state 是否為0。如果 state 等於 0,說明暫時沒有其它執行緒獲取鎖;如果 state 不等於 0,則說明有其它執行緒獲取了鎖。
此時再判斷state的低16位(w)是否為0,如果w為0,表示其他執行緒獲取了讀鎖,此時進入CLH佇列進行阻塞等待。
如果w不為0,則說明其他執行緒獲取了寫鎖,此時需要判斷獲取了寫鎖的是不是當前執行緒,如果不是則進入CLH佇列進行阻塞等待,如果獲取了寫鎖的是當前執行緒,則判斷當前執行緒獲取寫鎖是否超過了最大次數,若超過,丟擲異常。反之則更新同步狀態。
// 獲取寫鎖protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); // 判斷state是否為0 if (c != 0) { // 獲取鎖失敗 if (w == 0 || current != getExclusiveOwnerThread()) return false; // 判斷當前執行緒獲取寫鎖是否超出了最大次數65535 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 鎖重入 setState(c + acquires); return true; } // 非公平鎖實現中writerShouldBlock()永遠返回為false // CAS修改state的值 if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; // CAS成功後,設定當前執行緒為擁有獨佔鎖的執行緒 setExclusiveOwnerThread(current); return true;}
在公平鎖的實現中當CLH佇列中存在排隊的執行緒,那麼 writerShouldBlock() 方法就會返回為true,此時獲取寫鎖的執行緒就會被阻塞。
釋放寫鎖釋放寫鎖的邏輯比較簡單
protected final boolean tryRelease(int releases) { // 寫鎖是否被當前執行緒持有 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0; // 沒有其他執行緒持有寫鎖 if (free) setExclusiveOwnerThread(null); setState(nextc); return free;}
鎖的升級?
// 準備讀快取readLock.lock();try { v = map.get(key); if(v == null) { writeLock.lock(); try { if(map.get(key) != null) { return map.get(key); } // 更新快取程式碼,省略 } finally { writeLock.unlock(); } }} finally { readLock.unlock();}
對於上面獲取快取資料(這也是RRW的應用場景)的程式碼,先是獲取讀鎖,然後再升級為寫鎖,這樣的行為叫做鎖的升級。可惜RRW不支援,這樣會導致寫鎖永久等待,最終導致執行緒被永久阻塞。所以 鎖的升級是不允許的 。
鎖的降級雖然鎖的升級不允許,但是鎖的降級卻是可以的。
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();ReadLock readLock = lock.readLock();WriteLock writeLock = lock.writeLock();Map<String, String> dataMap = new HashMap();public void processCacheData() { readLock.lock(); if(!cacheValid()) { // 釋放讀鎖,因為不允許 readLock.unlock(); writeLock.lock(); try { if(!cacheValid()) { dataMap.put("key", "think123"); } // 降級為讀鎖 readLock.lock(); } finally { writeLock.unlock(); } } try { // 仍然持有讀鎖 System.out.println(dataMap); } finally { readLock.unlock(); }}public boolean cacheValid() { return !dataMap.isEmpty();}
RRW需要注意的問題在讀取很多、寫入很少的情況下,RRW 會使寫入執行緒遭遇飢餓(Starvation)問題,也就是說寫入執行緒會因遲遲無法競爭到鎖而一直處於等待狀態。寫鎖支援條件變數,讀鎖不支援。讀鎖呼叫newCondition() 會丟擲UnsupportedOperationException 異常來源:https://www.tuicool.com/articles/NFn2IrN