高併發和執行緒安全高併發:在某個時間上,有大量的使用者訪問同一資源。例如天貓雙11,12306春節購票都是典型的高併發應用場景執行緒安全:在某個時間點上,當大量使用者訪問同一資源,由於多執行緒的排程機制(搶佔式)的原因,可能會導致被訪問的資源出現數據汙染問題。多執行緒的安全性問題多執行緒的安全性問題-可見性可見性:一個執行緒看不見另外一個執行緒對共享變數進行修改, 共享變數是多個執行緒操作同一個變數
為了說明可見性問題,這裡宣告一個任務物件ThreadSafeVisibility,該任務物件主要包含一個共享變數flag,而任務就是在睡眠5秒後修改共享變數的值
這也就是可見性問題,子執行緒中修改了flag的值,而主執行緒的flag卻依然是false。如果想要搞清楚原因,就需要了解Java記憶體模型。JMM(Java Memory Mode)也就是Java記憶體模型描述了Java程式中各種變數(執行緒共享變數)的訪問規則,以及在JVM中將變數儲存到記憶體和讀取記憶體變數的細節:所有的共享變數都在主記憶體(這裡指的是常量區)中,執行緒在執行的時候,有單獨的工作記憶體(也就是啟動執行緒後在棧區開闢的空間),會把共享變數複製一份到單獨的工作記憶體中,並且對變數所有的操作都是在單獨的工作記憶體中完成的,不會直接讀寫主記憶體的變數值。
由於主線中的死迴圈邏輯很簡單,因此執行非常快。導致主執行緒不會讀取主記憶體中的共享變數flag的值,而直接使用自己獨立的記憶體空間中的flag的值,因此程式是死迴圈。
多執行緒的安全問題-有序性有序性問題指的是編譯器在編譯程式碼時有可能會對程式碼進行重排(指令重排),即改變程式碼的順序,重排後在多執行緒的環境下會影響另外一個執行緒訪問的結果產生影響有序性問題不是必然發生,這個跟編譯有關,因此無法使用程式演示,但是我們有辦法解決該問題,請繼續往下看。
多執行緒的安全性問題-原子性原子性指的是一次操作或者多次操作中,要麼所有的操作全部執行並且不會受到任何因素的干擾而中斷,要麼所有的操作都不執行,多個操作是一個不可分隔的整體。
這裡以主執行緒和子執行緒同時對共享變數number進行自增100000操作為例說明多執行緒的原子性安全問題
首先定義一個任務ThreadSafeAtomicity,該任務包含一個共享變數number,任務的run()方法就是將number自增100000次
主執行緒和子執行緒同時對共享變數執行自增100000次操作,但是最後得到的結果卻小於200000,原因是當多執行緒同時操作共享變數時不是原子操作,可能會產生覆蓋的效果。多執行緒時 number++需要經過三步驟
從主記憶體中複製一份到執行緒的獨立記憶體中將number的值加1將加1後的值寫入主記憶體但是由於Java執行緒的排程機制是搶佔式,因此子執行緒在完成這三步驟之前很有可能主執行緒也搶佔到了CPU資源去執行number自增的任務。這時主執行緒加1的結果可能會覆蓋子執行緒加1的結果。因此最後的結果小於2000000,這裡可以使用之前學習過的Lock鎖或者是同步程式碼塊、同步方法解決。我這裡使用同步程式碼塊解決,需要注意的是main方法和run()方法的鎖物件必須是同一個。
將run()方法對number自增100000次的程式碼加上同步鎖
Volatile關鍵字volatile關鍵字是一個變數修飾符,它只能修飾成員變數,被volatile關鍵字修飾的變數能強制執行緒每次從主記憶體中獲取新的值,解決可見性問題,並且保證此變數不會被編譯器最佳化,不會進行指令重排,解決有序性問題。
ThreadSafeVisibility中的共享變數沒有使用volatile關鍵字修飾,因此出現了可見性的問題。此時我們給變數加上volatile關鍵字,就可以解決可見性和有序性問題
但是volatile不能解決原子性問題,因為多執行緒操作共享變數會覆蓋結果。
在ThreadSafeAtomicity中給共享變數number加上volatile修飾符
/** 多執行緒共享變數 */ public static volatile int number = 0;
執行測試類ThreadSafeAtomicity後執行結果還是小於2000000
執行結果
原子類原子類概述在java.util.concurrent.atomic包下定義了一些原子操作的類
原子類
java.util.concurrent.atomic.AtomicInteger 對int操作的原子類java.util.concurrent.atomic.AtomicLong 對long操作的原子類java.util.concurrent.atomic.AtomicBoolean 對boolean操作的原子類原子類保證對變數的可見性、有序性、原子性AtomicInteger的使用和實現原理使用AtomicInteger改造之前程式的原子性問題
在ThreadSafeAtomicity中建立AtomicInteger物件,然後在任務方法run()中使用該物件的gertAndIncrement()方法對原子變數number自增1
AtomicInteger的工作原理是基於CAS(Compare And Set)機制,即比較並交換:比較指的是執行緒持有的資料和主記憶體中的資料 比較,而交換指的是執行緒將操作資料之後的值寫回主記憶體。
AtomicInteger的getAndIncrement()方法實現的機制是拿當前執行緒所在的棧空間的變數值(該值本來是從主記憶體複製獲取的)與當前主記憶體中的變數值進行比較,如果相等那麼就對變數自增1,然後把自增後的值寫回主記憶體,如果不相等則再次從主記憶體中獲取變數的值存放到執行緒所在的棧記憶體中,然後再次和主記憶體中的變數值進行比較,相等就自增1。
陣列的原子類java.util.concurrent.atomic包下封裝了幾個關於陣列操作的原子類
java.util.concurrent.atomic.AtomicIntegerArray:對int陣列操作的原子類java.util.concurrent.atomic.AtomicLongArray:對long陣列操作的原子類java.util.concurrent.atomic.AtomicReferenceArray:對Object陣列操作的原子類這裡介紹AtomicIntegerArray的使用
首先編寫一個任務AtomicIntegerArrayHandler,該任務主要包含一個多執行緒共享變數numbers,即整型陣列,然後在任務的方法中遍歷陣列,並給每個陣列的元素的值都進行加1操作。
/** * 多執行緒運算元組 */class AtomicIntegerArrayHandler implements Runnable{ /** * 多執行緒共享變數 * 變數型別是int[] * 初始化預設值為0 */ static int[] numbers=new int[1000]; @Override public void run() { //遍歷陣列 for (int i = 0; i < numbers.length; i++) { //陣列的每個元素值自增1 numbers[i]++; } }}
然後在測試類AtomicIntegerArrayTest中建立100個AtomicIntegerArrayHandler任務的執行緒並啟動,並在main執行緒中暫停3秒後列印輸出陣列元素自增1後的結果。因為會有100個執行緒同時運算元組,期望的值是希望1000個數組元素的值都是100
但是最終的執行結果中發現還是有陣列元素的值被其他執行緒所覆蓋。
此時我們可以使用AtomicIntegerArray代替int[]整型陣列來改造程式
併發包集合的多執行緒同步問題和解決方案之前使用的集合實現類java.util.ArrayList,java.util.LinkedList,java.util.HashSet,java.util.LinkedHashSet,java.util.TreeSet,java.util.HashMap,java.util.LinkedHashMap,java.util.TreeMap都是執行緒不安全的。在高併發場景下操作時會存在對應的問題。
這裡使用多執行緒往ArrayList中新增資料演示ArrayList的執行緒安全問題
由於ArrayList不能保證多執行緒時操作同步,在執行程式時集合的元素少於50000或者出現數組越界異常。
這裡可以使用JDK提供的CopyOnWriteArrayList解決ArrayList的執行緒同步問題,將上面程式的共享變數替換成CopyOnWriteArrayList即可。
HashSet的執行緒安全問題
JDK提供了java.util.concurrent.CopyOnWriteArraySet解決HashSet的執行緒安全問題
HashMap的執行緒安全問題
JDK提供了java.util.concurrent.ConcurrentHashMap解決HashMap的執行緒安全問題
CountDownLatchCountDownLatch執行一個或者多個執行緒等待其他執行緒完成操作。CountDownLatch構造方法
public CountDownLatch(int count):初始化一個指定計數器的CountDownLatch物件, 計數器的數量是等待執行緒數量CountDownLatch重要方法
public void await()throws InterruptedException 讓當前執行緒等待,當計數器的值為0的時候結束等待public void countDown() 計數器減1使用CountDownLatch完成購物的流程
註冊使用者登入使用者瀏覽商品加入購物車支付訂單提交訂單首先定義三個執行緒:OrderThread,ShoppingCartThread和PayThread,OrderThread負責完成1,2,3,6,ShoppingCartThread負責4,PayThread負責5。
其中OrderThread中包含了新增購物車和支付之外的所有流程,但是在提交訂單之前需要加入購物車和支付訂單,因此run方法中呼叫了CountDownLatch物件的await()方法讓當前執行緒等待。而OrderThread,ShoppingCartThread和PayThread 三個執行緒共享一個CountDownLatch物件,因此CountDownLatch由構造器作為引數傳入
/** * 下單執行緒 */@Log4j2class OrderThread extends Thread { CountDownLatch countDownLatch; /** * 構造器傳入countDownLatch和執行緒名稱 * @param countDownLatch * @param threadName */ public OrderThread(CountDownLatch countDownLatch,String threadName) { this.countDownLatch = countDownLatch; setName(threadName); } @Override public void run() { log.info(Thread.currentThread().getName()+" 註冊使用者"); log.info(Thread.currentThread().getName()+" 登入使用者"); log.info(Thread.currentThread().getName()+" 瀏覽商品"); // 讓當前執行緒等待 try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("提交訂單"); }}
購物車執行緒和支付執行緒
/** * 購物車執行緒 */@Log4j2class ShoppingCatThread extends Thread { CountDownLatch countDownLatch; /** * 構造器傳入countDownLatch和執行緒名稱 * @param countDownLatch * @param threadName */ public ShoppingCatThread(CountDownLatch countDownLatch,String threadName) { this.countDownLatch = countDownLatch; setName(threadName); } @Override public void run() { log.info(Thread.currentThread().getName()+" 加入購物車"); // 任務執行完成後計數器減1 countDownLatch.countDown(); }}/** * 支付執行緒 */@Log4j2class PayThread extends Thread { CountDownLatch countDownLatch; /** * 構造器傳入countDownLatch和執行緒名稱 * @param countDownLatch * @param threadName */ public PayThread(CountDownLatch countDownLatch,String threadName) { this.countDownLatch = countDownLatch; setName(threadName); } @Override public void run() { log.info(Thread.currentThread().getName()+" 支付訂單"); // 任務執行完成後計數器減1 countDownLatch.countDown(); }}
在測試類中建立三個執行緒物件共享的CountDownLatch物件,然後依次建立三個執行緒物件並且啟動三個執行緒,為了讓訂單執行緒首先啟動,因此在啟動後暫停了1秒鐘。
CyclicBarrierCyclicBarrier 字面意思理解就是可迴圈使用的屏障,它能實現讓一組執行緒到達一個屏障時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。
CyclicBarrier構造方法
public CyclicBarrier (int parties,Runnable barrierAction) parties 代表要達到屏障的執行緒數量,barrierAction表示達到屏障後要執行的執行緒CyclicBarrier重要方法
public int awit() 每個執行緒呼叫awit()方法告訴CyclicBarrier 已經到達了屏障,當前執行緒被阻塞使用CyclicBarrier模擬一個開會的場景:在網際網路公司做開發通常會有5個角:運營人員、產品經理、測試人員、開發人員和運維人員,當新需求過來後通常需要他們一起到達會議室後才能開會。
首先定義員工去會議室任務,當人員到大會議室後呼叫CyclicBarrier物件的await()方法讓當前執行緒阻塞
/** * 員工去會議室任務 */@Log4j2class EmployeeRunnable implements Runnable{ CyclicBarrier cyclicBarrier; EmployeeRunnable(CyclicBarrier cyclicBarrier){ this.cyclicBarrier=cyclicBarrier; } @Override public void run() { log.info(Thread.currentThread().getName()+" 已經到達會議室"); try { //告訴cyclicBarrier我已經到達屏障,此時執行緒處於阻塞狀態 cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+" 已經離開會議室"); }}
然後在測試類CyclicBarrierTest中建立 CyclicBarrier物件,並傳遞開會的任務,該任務會在5個員工執行緒到達會議室後執行
SemaphoreSemaphore 字面量意思是訊號量,主要是用於控制執行緒的併發數量,可以用於設定同時幾個執行緒執行,控制訪問資源的執行緒數目。
Seamphore構造方法
public Seamphore(int permits) permits表示許可的執行緒數量Semaphore重要方法
public void acquire() throws InterruptedException 表示獲取許可public void release() 表示釋放許可定義Order類,在該類中建立Semaphore物件,設定許可的執行緒數量為3個,並在createOrder()方法呼叫acquire()方法獲得許可,在執行生成訂單後呼叫release()方法釋放許可。
@Log4j2class Order{ //每次只執行3個使用者下單 Semaphore semaphore=new Semaphore(3); public void createOrder(){ try { //獲得許可 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+" 開始下單"); try { //模擬生成訂單資訊 Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } log.info(Thread.currentThread().getName()+" 訂單生成成功"); //釋放許可 semaphore.release(); }}
定義OrderRunnable 訂單任務,在執行任務時呼叫Order的createOrder()方法
/** * 下單任務 */class OrderRunnable implements Runnable{ private Order order; public OrderRunnable(Order order ){ this.order=order; } @Override public void run() { order.createOrder(); }}
在測試類SemaphoreTest中建立OrderRunnable任務物件,並啟動15個執行緒
ExchangerExchanger(交換者)是一個用於執行緒協作的工具類,用於兩個執行緒之間的資料交換。兩個執行緒透過exchange()方法交換資料,如果第一個執行緒先執行exchange()方法,它會一直等待第二個執行緒也執行exchange()方法,當兩個執行緒都達到同步點,這兩個執行緒可以交換資料,將本執行緒生產的資料傳遞給對方。
Exchanger構造方法
public Exchanger()Exchanger重要方法
public V change(V x); 引數是要交換的資料 ,返回值是對方執行緒傳遞的資料首先定義A執行緒
A執行緒的任務就是呼叫Exchanger物件的exchange()方法將字串"資料A" 傳給B執行緒
/** * A執行緒 */@Log4j2class ThreadA extends Thread{ Exchanger<String> exchanger; ThreadA (Exchanger<String> exchanger,String threadName){ this.exchanger=exchanger; super.setName(threadName); } @Override public void run() { log.info(Thread.currentThread().getName()+"把資料交換給執行緒B"); try { //返回值是B執行緒給A執行緒的資料 String message=exchanger.exchange("資料A"); log.info("執行緒B給的資料是{}",message); } catch (InterruptedException e) { e.printStackTrace(); } }}
然後定義B執行緒B執行緒的任務是呼叫Exchanger的exchange()方法將字串"資料B"傳給執行緒A
/** * B執行緒 */@Log4j2class ThreadB extends Thread{ Exchanger<String> exchanger; ThreadB(Exchanger<String> exchanger,String threadName){ this.exchanger=exchanger; setName(threadName); } @Override public void run() { log.info(Thread.currentThread().getName()+"把資料交換給執行緒A"); try { String returnMessage = exchanger.exchange("資料B"); log.info("執行緒A給的資料是{}",returnMessage); } catch (InterruptedException e) { e.printStackTrace(); } }}
然後在測試類ExchangerTest類中分別建立兩個執行緒物件並啟動執行緒
import lombok.extern.log4j.Log4j2;import java.util.concurrent.Exchanger;/** * Exchanger測試用例 * * @author tony [email protected] * @version 2020/12/23 14:17 * @since JDK11 */public class ExchangerTest { public static void main(String[] args) { //兩個執行緒共享一個Exchanger物件 Exchanger<String> exchanger=new Exchanger<>(); //建立並啟動A執行緒 ThreadA threadA=new ThreadA(exchanger,"執行緒A"); threadA.start(); //建立並啟動B執行緒 ThreadB threadB=new ThreadB(exchanger,"執行緒B"); threadB.start(); }}
程式執行結果
執行緒池執行緒池(ThreadPool)是存放多個執行緒的容器,由於執行緒的頻繁建立和銷燬非常消耗系統資源,有了執行緒池,就可以重複利用執行緒,省去了頻繁建立執行緒物件的操作,無需反覆建立執行緒而消耗過多的資源。
執行緒池建立後會初始化指定數量的執行緒,並且內部會維護一個任務佇列,當有任務過來時,如果執行緒池有空閒的執行緒,此時會分配空閒的執行緒執行任務,如果任務數量超過了可用的執行緒數,那麼此時的任務會進入任務佇列並處於等待狀態,直到執行緒池中有可用的執行緒來執行任務。執行緒池分配執行緒可以是無序或者有序的。
java.util.concurrent.ExecutorService 是執行緒池介面,java.util.concurrent.Executors 執行緒工程類提供了一些常用的執行緒池方法。
/** 建立固定執行緒數量的執行緒池 */ private static final ExecutorService executor = Executors.newFixedThreadPool(3);
執行緒池建立後就可以透過commit()方法來提交任務,任務的型別可以是Runnable介面
可以呼叫shutdown()方法銷燬執行緒池,不過該方法僅在本地測試使用。因為執行緒池不銷燬,程式不會正常結束。
執行緒池提交任務還可以是Callable介面,該介面的call()方法可以丟擲異常,也有返回值,Callable型別的任務提交給執行緒池後返回值是java.util.concurrent.Future<V>可以透過Future.get()獲取任務的返回值。