回覆列表
  • 1 # 拉布拉斯

    執行緒池建立和銷燬是有代價的,所以可以透過提前建立執行緒池來緩解這個問題。但是建立多少個是個問題?

    一般根據業務複雜度,比如提前建立100個,然後設定一個低水位和高水位,比如20% 和80%,當達到低水位且持續一段時間,就可以釋放一部分。當高水位一段時間後,可以動態增加一部分。同時增加手動設定的api可以根據預測提前調整。

  • 2 # 此生唯一

    在java中多執行緒並不陌生,在一定的範圍內,多執行緒數量的增加會明顯提升整個系統的吞吐效能,但是執行緒本身會極大的耗費記憶體空間,執行緒的頻繁建立和回收也極其佔用CPU資源,多執行緒甚至會拖垮整個服務!

    所以,執行緒的利用必須掌握在一個度,太少的執行緒數可能會浪費CPU資源,而太高也極有可能反而降低整個應用效能;

    執行緒池:基於使用多執行緒存在的問題,JDK提出了執行緒池技術,類似於資料庫連線池,都是保持池中部分執行緒活躍狀態,在需要使用執行緒的時候,直接從執行緒池中獲取,使用。當執行緒使用結束,就進行回收(直接放回池中等待,而不是GC),這樣就能避免了執行緒的頻繁建立和回收。

    JAVA中的執行緒池:JDK提供了執行緒池框架Executor,幫助程式更好的管理執行緒。總的結構如下截圖:

    比較常見的執行緒池物件獲取方式為:

    ①newSingleThreadExecutor():返回單執行緒的執行緒池,一個接一個的處理任務,執行緒異常的時候,會建立新的執行緒替代; ②newFixedThreadPool:在達到最大執行緒之前,有一個任務就建立一個執行緒,直到達到最大執行緒數量; ③newCachedThreadPool:動態的設定最合適的執行緒數量,最大為JVM能夠支援的大小; ④newScheduledThreadPool:指定執行緒數量,並週期性的執行任務; ⑤newSingleThreadScheduledExecutor:指定執行緒數量1個,並週期性的執行任務;

    從原始碼來看,上面幾種執行緒池底層都是封裝的ThreadPoolExecutor物件,檢視原始碼可知比較重要的屬性(物件)截圖如下:

    定義了執行緒池中的執行緒數量,最大執行緒池數量,執行緒工廠(用於執行緒的建立),workQuere任務佇列,handler拒絕策略等屬性,用於執行緒池的物件初始化和任務排程!

    下圖是ThreadPoolExecutor物件中的execute方法截圖:

    解釋如下:

    1,當前執行緒總數小於核心執行緒數,則透過addWorker進行執行;

    2,否則透過wordQueue.offer提交到等待佇列,

    3,進入等待佇列失敗,則透過addWorker提交到執行緒池,失敗則執行拒絕策略;

    執行緒池的擴充套件:JDK允許開發人員自主擴充套件執行緒池,透過提供的beforeExecute,afterExecute,terminated三個介面可以像處理AOP一樣方便的管理執行緒池,可自行實現狀態跟蹤,除錯資訊等用以監控執行緒池!

    執行緒池的最佳化:執行緒池的最佳化主要針對執行緒數量進行,一般來說只要使用的不是最大最小執行緒數量都可以,但是具體的還要根據場景,參考CPU核心數,等待時間等因素來判斷最合適的執行緒數,比如是批次運算這種密集的CPU執行,則執行緒數設定為CPU核心數即可,如果有大量阻塞,則可以使用CPU核心數的偶數倍數,在有一本書中得出了一個公式如下截圖:

    jdk中的執行緒池技術比較完善,加上其他的多執行緒技術,促使JAVA成為高併發領域的佼佼者,最近一直在分享JAVA技術,得到很多朋友的鼓勵,在此表示感謝,我也會一直持續的進行分享,敬請關注。。

  • 3 # 架構師成長錄

    筆者從事Java開發多年,在Java併發程式設計中積累了一定的經驗,就執行緒池擴充套件最佳化問題,來分享一下自己的一些看法。

    執行緒池利用好了,可以高系統性能,如果利用不好,也會帶來一系列問題。今天我們從6個方面來談執行緒池的最佳化。

    1. 核心執行緒WarmUp最佳化

    預設情況下,核心工作執行緒值在初始的時候被建立,當新任務到來的時候被啟動,但是我們可以透過重寫prestartCoreThread或prestartCoreThreads方法來改變這種行為。通常情況下我們可以在應用啟動時來WarmUp核心執行緒,從而達到任務過來能夠立即執行的結果,使得初始任務處理的時間得到一定最佳化。

    2. 定製工作執行緒的建立最佳化

    新的執行緒是透過ThreadFactory來建立的,如果沒有指定,預設會使用Executors#defaultThreadFactory,這時建立的執行緒將都屬於同一個執行緒組,擁有同樣的優先順序和daemon狀態。透過擴充套件配置ThreadFactory,我們可以配置執行緒的名稱、執行緒組合daemon狀態。如果呼叫ThreadFactory#createThread失敗,將返回null,executor將不會執行任何任務。

    3. 核心執行緒回收

    如果當前池子中的工作執行緒數大於corePoolSize,超過corePoolSize的執行緒處於空閒的時間大於keepAliveTime,則這些執行緒將會被終止,這是一種減少不必要資源消耗的策略。這個引數可以在執行時被改變,我們同樣可以將這種策略應用給核心執行緒,可以透過呼叫allowCoreThreadTimeout來實現。

    4. 正確的選擇佇列

    下面主要是不同佇列策略表現:

    4.1 直接遞交:一種比較好的預設選擇是使用SynchronousQueue,這種策略會將提交的任務直接傳送給工作執行緒,而不持有。如果當前沒有工作執行緒來處理,即任務放入佇列失敗,則根據執行緒池的實現,引發新的工作執行緒建立,新提交的任務就會被處理。這種策略在當提交的一批任務之間有依賴關係時能避免鎖競爭消耗。值得一提的是,這種策略最好配合unbounded執行緒數來使用,從而避免任務被拒絕。同時我們必須要考慮到一種場景,當任務到來的速度大於任務處理的速度時,將會引起無限制的執行緒數不斷增加的問題。

    4.2 無界佇列:使用無界佇列如LinkedBlockingQueue沒有指定最大容量時,將會引起當核心執行緒都在忙的時候,新的任務被放在佇列上,因此,永遠不會有大於corePoolSize的執行緒被建立,maximumPoolSize引數將失效。這種策略比較適合所有的任務都不相互依賴、獨立執行的情況。舉個例子,網頁伺服器中,每個執行緒獨立處理請求。但是當任務處理速度小於任務進入速度時會引起佇列的無限膨脹。

    4.3 有界佇列:有界佇列如ArrayBlockingQueue幫助限制資源的消耗,但是不好控制。佇列長度和maximumPoolSize這兩個值會相互影響,使用大的佇列和小maximumPoolSize會減少CPU的使用、作業系統資源、上下文切換的消耗,但是會降低吞吐量。如果任務被頻繁的阻塞,系統則可以排程更多的執行緒。使用小的佇列通常需要大maximumPoolSize,從而使得CPU更忙一些,但是又會增加降低吞吐量的執行緒排程的消耗。

    總結一下是IO密集型可以考慮多些執行緒來平衡CPU的使用,CPU密集型可以考慮少些執行緒減少執行緒排程的消耗。

    5. 合理的配置執行緒池

    要想合理的配置執行緒池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

    任務性質:CPU密集型任務,IO密集型任務和混合型任務;

    任務優先順序:高,中和低;

    任務執行時間:長,中和短;

    任務依賴性:是否依賴其他系統資源,如資料庫連線等。

    任務性質不同的任務可以用不同規模的執行緒池分開處理。

    CPU密集型任務配置儘可能小的執行緒,如配置Ncpu+1個執行緒的執行緒池。

    IO密集型任務則由於執行緒並不是一直在執行任務,則配置儘可能多的執行緒,如2*Ncpu。

    混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於序列執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。我們可以透過Runtime.getRuntime().availableProcessors()方法獲得當前裝置的CPU個數。

    優先順序不同的任務可以使用優先順序佇列PriorityBlockingQueue來處理。它可以讓優先順序高的任務先得到執行,需要注意的是如果一直有優先順序高的任務提交到佇列裡,那麼優先順序低的任務可能永遠不能執行。

    執行時間不同的任務可以交給不同規模的執行緒池來處理,或者也可以使用優先順序佇列,讓執行時間短的任務先執行。

    依賴資料庫連線池的任務,因為執行緒提交SQL後需要等待資料庫返回結果,如果等待的時間越長CPU空閒時間就越長,那麼執行緒數應該設定越大,這樣才能更好的利用CPU。

    建議使用有界佇列,有界佇列能增加系統的穩定性和預警能力。

    6. 執行緒池的監控

    透過執行緒池提供的引數進行監控。執行緒池裡有一些屬性在監控執行緒池的時候可以使用:

    taskCount:執行緒池需要執行的任務數量。

    completedTaskCount:執行緒池在執行過程中已完成的任務數量。小於或等於taskCount。

    largestPoolSize:執行緒池曾經建立過的最大執行緒數量。透過這個資料可以知道執行緒池是否滿過。如等於執行緒池的最大大小,則表示執行緒池曾經滿了。

    getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,池裡的執行緒不會自動銷燬,所以這個大小隻增不+ getActiveCount:獲取活動的執行緒數。

    透過擴充套件執行緒池進行監控。透過繼承執行緒池並重寫執行緒池的beforeExecute,afterExecute和terminated方法,我們可以在任務執行前,執行後和執行緒池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法線上程池裡是空方法。如protected void beforeExecute(Thread t, Runnable r) { }

  • 4 # 華為雲開發者聯盟

    提示

    請帶著這些問題繼續後文,會很大程度上幫助你更好的理解相關知識點。@pdai

    為什麼要有執行緒池?Java是實現和管理執行緒池有哪些方式? 請簡單舉例如何使用。為什麼很多公司不允許使用Executors去建立執行緒池? 那麼推薦怎麼使用呢?ThreadPoolExecutor有哪些核心的配置引數? 請簡要說明ThreadPoolExecutor可以建立哪是哪三種執行緒池呢?當佇列滿了並且worker的數量達到maxSize的時候,會怎麼樣?說說ThreadPoolExecutor有哪些RejectedExecutionHandler策略? 預設是什麼策略?簡要說下執行緒池的任務執行機制? execute –> addWorker –>runworker (getTask)執行緒池中任務是如何提交的?執行緒池中任務是如何關閉的?在配置執行緒池的時候需要考慮哪些配置因素?如何監控執行緒池的狀態?為什麼要有執行緒池

    執行緒池能夠對執行緒進行統一分配,調優和監控:

    降低資源消耗(執行緒無限制地建立,然後使用完畢後銷燬)提高響應速度(無須建立執行緒)提高執行緒的可管理性ThreadPoolExecutor例子

    Java是如何實現和管理執行緒池的?

    從JDK 5開始,把工作單元與執行機制分離開來,工作單元包括Runnable和Callable,而執行機制由Executor框架提供。

    WorkerThread

    SimpleThreadPool

    程式中我們建立了固定大小為五個工作執行緒的執行緒池。然後分配給執行緒池十個工作,因為執行緒池大小為五,它將啟動五個工作執行緒先處理五個工作,其他的工作則處於等待狀態,一旦有工作完成,空閒下來工作執行緒就會撿取等待佇列裡的其他工作進行執行。

    這裡是以上程式的輸出。

    輸出表明執行緒池中至始至終只有五個名為 "pool-1-thread-1" 到 "pool-1-thread-5" 的五個執行緒,這五個執行緒不隨著工作的完成而消亡,會一直存在,並負責執行分配給執行緒池的任務,直到執行緒池消亡。

    Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實現,但是 ThreadPoolExecutor 提供的功能遠不止於此。我們可以在建立 ThreadPoolExecutor 例項時指定活動執行緒的數量,我們也可以限制執行緒池的大小並且建立我們自己的 RejectedExecutionHandler 實現來處理不能適應工作佇列的工作。

    這裡是我們自定義的 RejectedExecutionHandler 介面的實現。

    RejectedExecutionHandlerImpl.java

    ThreadPoolExecutor 提供了一些方法,我們可以使用這些方法來查詢 executor 的當前狀態,執行緒池大小,活動執行緒數量以及任務數量。因此我是用來一個監控執行緒在特定的時間間隔內列印 executor 資訊。

    MyMonitorThread.java

    這裡是使用 ThreadPoolExecutor 的執行緒池實現例子。

    WorkerPool.java

    注意在初始化 ThreadPoolExecutor 時,我們保持初始池大小為 2,最大池大小為 4 而工作佇列大小為 2。因此如果已經有四個正在執行的任務而此時分配來更多工的話,工作佇列將僅僅保留他們(新任務)中的兩個,其他的將會被RejectedExecutionHandlerImpl 處理。

    上面程式的輸出可以證實以上觀點。

    注意 executor 的活動任務、完成任務以及所有完成任務,這些數量上的變化。我們可以呼叫 shutdown() 方法來結束所有提交的任務並終止執行緒池。

    ThreadPoolExecutor使用詳解

    其實java執行緒池的實現原理很簡單,說白了就是一個執行緒集合workerSet和一個阻塞佇列workQueue。當用戶向執行緒池提交一個任務(也就是執行緒)時,執行緒池會先將任務放入workQueue中。workerSet中的執行緒會不斷的從workQueue中獲取執行緒然後執行。當workQueue中沒有任務的時候,worker就會阻塞,直到佇列中有任務了就取出來繼續執行。

    Execute原理

    當一個任務提交至執行緒池之後:

    執行緒池首先當前執行的執行緒數量是否少於corePoolSize。如果是,則建立一個新的工作執行緒來執行任務。如果都在執行任務,則進入2.判斷BlockingQueue是否已經滿了,倘若還沒有滿,則將執行緒放入BlockingQueue。否則進入3.如果建立一個新的工作執行緒將使當前執行的執行緒數量超過maximumPoolSize,則交給RejectedExecutionHandler來處理任務。

    當ThreadPoolExecutor建立新執行緒時,透過CAS來更新執行緒池的狀態ctl.

    引數corePoolSize 執行緒池中的核心執行緒數,當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於corePoolSize, 即使有其他空閒執行緒能夠執行新來的任務, 也會繼續建立執行緒;如果當前執行緒數為corePoolSize,繼續提交的任務被儲存到阻塞佇列中,等待被執行;如果執行了執行緒池的prestartAllCoreThreads()方法,執行緒池會提前建立並啟動所有核心執行緒。workQueue 用來儲存等待被執行的任務的阻塞佇列. 在JDK中提供瞭如下阻塞佇列: 具體可以參考JUC 集合: BlockQueue詳解ArrayBlockingQueue: 基於陣列結構的有界阻塞佇列,按FIFO排序任務;LinkedBlockingQueue: 基於連結串列結構的阻塞佇列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQueue;SynchronousQueue: 一個不儲存元素的阻塞佇列,每個插入操作必須等到另一個執行緒呼叫移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue;PriorityBlockingQueue: 具有優先順序的無界阻塞佇列;

    LinkedBlockingQueue比ArrayBlockingQueue在插入刪除節點效能方面更優,但是二者在put(), take()任務的時均需要加鎖,SynchronousQueue使用無鎖演算法,根據節點的狀態判斷執行,而不需要用到鎖,其核心是Transfer.transfer().

    maximumPoolSize 執行緒池中允許的最大執行緒數。如果當前阻塞佇列滿了,且繼續提交任務,則建立新的執行緒執行任務,前提是當前執行緒數小於maximumPoolSize;當阻塞佇列是無界佇列, 則maximumPoolSize則不起作用, 因為無法提交至核心執行緒池的執行緒會一直持續地放入workQueue.keepAliveTime 執行緒空閒時的存活時間,即當執行緒沒有任務執行時,該執行緒繼續存活的時間;預設情況下,該引數只在執行緒數大於corePoolSize時才有用, 超過這個時間的空閒執行緒將被終止;unit keepAliveTime的單位threadFactory 建立執行緒的工廠,透過自定義的執行緒工廠可以給每個新建的執行緒設定一個具有識別度的執行緒名。預設為DefaultThreadFactoryhandler 執行緒池的飽和策略,當阻塞佇列滿了,且沒有空閒的工作執行緒,如果繼續提交任務,必須採取一種策略處理該任務,執行緒池提供了4種策略:AbortPolicy: 直接丟擲異常,預設策略;CallerRunsPolicy: 用呼叫者所在的執行緒來執行任務;DiscardOldestPolicy: 丟棄阻塞佇列中靠最前的任務,並執行當前任務;DiscardPolicy: 直接丟棄任務;

    當然也可以根據應用場景實現RejectedExecutionHandler介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務。

    三種類型newFixedThreadPool

    執行緒池的執行緒數量達corePoolSize後,即使執行緒池沒有可執行任務時,也不會釋放執行緒。

    FixedThreadPool的工作佇列為無界佇列LinkedBlockingQueue(佇列容量為Integer.MAX_VALUE), 這會導致以下問題:

    執行緒池裡的執行緒數量不超過corePoolSize,這導致了maximumPoolSize和keepAliveTime將會是個無用引數由於使用了無界佇列, 所以FixedThreadPool永遠不會拒絕, 即飽和策略失效newSingleThreadExecutor

    初始化的執行緒池中只有一個執行緒,如果該執行緒異常結束,會重新建立一個新的執行緒繼續執行任務,唯一的執行緒可以保證所提交任務的順序執行.

    由於使用了無界佇列, 所以SingleThreadPool永遠不會拒絕, 即飽和策略失效

    newCachedThreadPool

    執行緒池的執行緒數可達到Integer.MAX_VALUE,即2147483647,內部使用SynchronousQueue作為阻塞佇列; 和newFixedThreadPool建立的執行緒池不同,newCachedThreadPool在沒有任務執行時,當執行緒的空閒時間超過keepAliveTime,會自動釋放執行緒資源,當提交新任務時,如果沒有空閒執行緒,則建立新執行緒執行任務,會導致一定的系統開銷; 執行過程與前兩種稍微不同:

    主執行緒呼叫SynchronousQueue的offer()方法放入task, 倘若此時執行緒池中有空閒的執行緒嘗試讀取 SynchronousQueue的task, 即呼叫了SynchronousQueue的poll(), 那麼主執行緒將該task交給空閒執行緒. 否則執行(2)當執行緒池為空或者沒有空閒的執行緒, 則建立新的執行緒執行任務.執行完任務的執行緒倘若在60s內仍空閒, 則會被終止. 因此長時間空閒的CachedThreadPool不會持有任何執行緒資源.關閉執行緒池

    遍歷執行緒池中的所有執行緒,然後逐個呼叫執行緒的interrupt方法來中斷執行緒.

    關閉方式 - shutdown

    將執行緒池裡的執行緒狀態設定成SHUTDOWN狀態, 然後中斷所有沒有正在執行任務的執行緒.

    關閉方式 - shutdownNow

    將執行緒池裡的執行緒狀態設定成STOP狀態, 然後停止所有正在執行或暫停任務的執行緒. 只要呼叫這兩個關閉方法中的任意一個, isShutDown() 返回true. 當所有任務都成功關閉了, isTerminated()返回true.

    ThreadPoolExecutor原始碼詳解幾個關鍵屬性內部狀態

    其中AtomicInteger變數ctl的功能非常強大: 利用低29位表示執行緒池中執行緒數,透過高3位表示執行緒池的執行狀態:

    RUNNING: -1 << COUNT_BITS,即高3位為111,該狀態的執行緒池會接收新任務,並處理阻塞佇列中的任務;SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態的執行緒池不會接收新任務,但會處理阻塞佇列中的任務;STOP : 1 << COUNT_BITS,即高3位為001,該狀態的執行緒不會接收新任務,也不會處理阻塞佇列中的任務,而且會中斷正在執行的任務;TIDYING : 2 << COUNT_BITS,即高3位為010, 所有的任務都已經終止;TERMINATED: 3 << COUNT_BITS,即高3位為011, terminated()方法已經執行完成任務的執行

    execute –> addWorker –>runworker (getTask)

    執行緒池的工作執行緒透過Woker類實現,在ReentrantLock鎖的保證下,把Woker例項插入到HashSet後,並啟動Woker中的執行緒。 從Woker類的構造方法實現可以發現: 執行緒工廠在建立執行緒thread時,將Woker例項本身this作為引數傳入,當執行start方法啟動執行緒thread時,本質是執行了Worker的runWorker方法。 firstTask執行完成之後,透過getTask方法從阻塞佇列中獲取等待的任務,如果佇列中沒有任務,getTask方法會被阻塞並掛起,不會佔用cpu資源;

    execute()方法

    ThreadPoolExecutor.execute(task)實現了Executor.execute(task)

    為什麼需要double check執行緒池的狀態?

    在多執行緒環境下,執行緒池的狀態時刻在變化,而ctl.get()是非原子操作,很有可能剛獲取了執行緒池狀態後執行緒池狀態就改變了。判斷是否將command加入workque是執行緒池之前的狀態。倘若沒有double check,萬一執行緒池處於非running狀態(在多執行緒環境下很有可能發生),那麼command永遠不會執行。

    addWorker方法

    從方法execute的實現可以看出: addWorker主要負責建立新的執行緒並執行任務 執行緒池建立新執行緒執行任務時,需要 獲取全域性鎖:

    Worker類的runworker方法繼承了AQS類,可以方便的實現工作執行緒的中止操作;實現了Runnable介面,可以將自身作為一個任務在工作執行緒中執行;當前提交的任務firstTask作為引數傳入Worker的構造方法;

    一些屬性還有構造方法:

    runWorker方法是執行緒池的核心:

    執行緒啟動之後,透過unlock方法釋放鎖,設定AQS的state為0,表示執行可中斷;Worker執行firstTask或從workQueue中獲取任務:進行加鎖操作,保證thread不被其他執行緒中斷(除非執行緒池被中斷)檢查執行緒池狀態,倘若執行緒池處於中斷狀態,當前執行緒將中斷。執行beforeExecute執行任務的run方法執行afterExecute方法解鎖操作

    透過getTask方法從阻塞佇列中獲取等待的任務,如果佇列中沒有任務,getTask方法會被阻塞並掛起,不會佔用cpu資源;

    getTask方法

    下面來看一下getTask()方法,這裡面涉及到keepAliveTime的使用,從這個方法我們可以看出執行緒池是怎麼讓超過corePoolSize的那部分worker銷燬的。

    注意這裡一段程式碼是keepAliveTime起作用的關鍵:

    allowCoreThreadTimeOut為false,執行緒即使空閒也不會被銷燬;倘若為ture,在keepAliveTime內仍空閒則會被銷燬。

    如果執行緒允許空閒等待而不被銷燬timed == false,workQueue.take任務: 如果阻塞佇列為空,當前執行緒會被掛起等待;當佇列中有任務加入時,執行緒被喚醒,take方法返回任務,並執行;

    如果執行緒不允許無休止空閒timed == true, workQueue.poll任務: 如果在keepAliveTime時間內,阻塞佇列還是沒有任務,則返回null;

    任務的提交submit任務,等待執行緒池execute執行FutureTask類的get方法時,會把主執行緒封裝成WaitNode節點並儲存在waiters連結串列中, 並阻塞等待執行結果;FutureTask任務執行完成後,透過UNSAFE設定waiters相應的waitNode為null,並透過LockSupport類unpark方法喚醒主執行緒;

    在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。

    Callable介面類似於Runnable,只是Runnable沒有返回值。Callable任務除了返回正常結果之外,如果發生異常,該異常也會被返回,即Future可以拿到非同步執行任務各種結果;Future.get方法會導致主執行緒阻塞,直到Callable任務執行完成;submit方法

    AbstractExecutorService.submit()實現了ExecutorService.submit() 可以獲取執行完的返回值, 而ThreadPoolExecutor 是AbstractExecutorService.submit()的子類,所以submit方法也是ThreadPoolExecutor`的方法。

    透過submit方法提交的Callable任務會被封裝成了一個FutureTask物件。透過Executor.execute方法提交FutureTask到執行緒池中等待被執行,最終執行的是FutureTask的run方法;

    FutureTask物件

    public class FutureTask<V> implements RunnableFuture<V> 可以將FutureTask提交至執行緒池中等待被執行(透過FutureTask的run方法來執行)

    內部狀態

    內部狀態的修改透過sun.misc.Unsafe修改

    get方法

    內部透過awaitDone方法對主執行緒進行阻塞,具體實現如下:

    如果主執行緒被中斷,則丟擲中斷異常;

    判斷FutureTask當前的state,如果大於COMPLETING,說明任務已經執行完成,則直接返回;如果當前state等於COMPLETING,說明任務已經執行完,這時主執行緒只需透過yield方法讓出cpu資源,等待state變成NORMAL;透過WaitNode類封裝當前執行緒,並透過UNSAFE新增到waiters連結串列;最終透過LockSupport的park或parkNanos掛起執行緒;

    run方法

    FutureTask.run方法是線上程池中被執行的,而非主執行緒

    透過執行Callable任務的call方法;如果call執行成功,則透過set方法儲存結果;如果call執行有異常,則透過setException儲存異常;任務的關閉

    shutdown方法會將執行緒池的狀態設定為SHUTDOWN,執行緒池進入這個狀態後,就拒絕再接受任務,然後會將剩餘的任務全部執行完

    shutdownNow做的比較絕,它先將執行緒池狀態設定為STOP,然後拒絕所有提交的任務。最後中斷左右正在執行中的worker,然後清空任務佇列。

    更深入理解

    為什麼執行緒池不允許使用Executors去建立? 推薦方式是什麼?

    執行緒池不允許使用Executors去建立,而是透過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確執行緒池的執行規則,規避資源耗盡的風險。 說明:Executors各個方法的弊端:

    newFixedThreadPool和newSingleThreadExecutor:   主要問題是堆積的請求處理佇列可能會耗費非常大的記憶體,甚至OOM。newCachedThreadPool和newScheduledThreadPool:   主要問題是執行緒數最大數是Integer.MAX_VALUE,可能會建立數量非常多的執行緒,甚至OOM。推薦方式 1

    首先引入:commons-lang3包

    推薦方式 2

    首先引入:com.google.guava包

    推薦方式 3

    spring配置執行緒池方式:自定義執行緒工廠bean需要實現ThreadFactory,可參考該介面的其它預設實現類,使用方式直接注入bean呼叫execute(Runnable task)方法即可

    配置執行緒池需要考慮因素

    從任務的優先順序,任務的執行時間長短,任務的性質(CPU密集/ IO密集),任務的依賴關係這四個角度來分析。並且近可能地使用有界的工作佇列。

    性質不同的任務可用使用不同規模的執行緒池分開處理:

    CPU密集型: 儘可能少的執行緒,Ncpu+1IO密集型: 儘可能多的執行緒, Ncpu*2,比如資料庫連線池混合型: CPU密集型的任務與IO密集型任務的執行時間差別較小,拆分為兩個執行緒池;否則沒有必要拆分。監控執行緒池的狀態

    可以使用ThreadPoolExecutor以下方法:

    getTaskCount() Returns the approximate total number of tasks that have ever been scheduled for execution.getCompletedTaskCount() Returns the approximate total number of tasks that have completed execution. 返回結果少於getTaskCount()。getLargestPoolSize() Returns the largest number of threads that have ever simultaneously been in the pool. 返回結果小於等於maximumPoolSizegetPoolSize() Returns the current number of threads in the pool.getActiveCount() Returns the approximate number of threads that are actively executing tasks.參考文章《Java併發程式設計藝術》https://www.jianshu.com/p/87bff5cc8d8chttps://blog.csdn.net/programmer_at/article/details/79799267https://blog.csdn.net/u013332124/article/details/79587436https://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice

    由於問答程式碼塊插入受限,部分程式碼未完全展示,若有需要可閱讀原文:戳我閱讀原文

  • 中秋節和大豐收的關聯?
  • 過時的智慧手機該不該扔?扔了算不算浪費資源?