背景
執行緒池是一種基於池化思想管理執行緒的工具,使用執行緒池可以減少建立銷燬執行緒的開銷,避免執行緒過多導致系統資源耗盡。在高併發的任務處理場景,執行緒池的使用是必不可少的。在雙11主圖價格表達專案中為了提升處理效能,很多地方使用到了執行緒池。隨著執行緒池的使用,逐漸發現一個問題,執行緒池的引數如何設定?
執行緒池引數中有三個比較關鍵的引數,分別是corePoolSize(核心執行緒數)、maximumPoolSize(最大執行緒數)、workQueueSzie(工作佇列大小)。根據任務的型別可以區分為IO密集型和CPU密集型,對於CPU密集型,一般經驗是設定corePoolSize=CPU核數+1,對於IO密集型需要根據具體的RT和流量來設定,沒有普適的經驗值。然而,我們一般遇到的情況多數是處理IO密集型任務,如果執行緒池引數不可動態調節,就沒辦法根據實際情況實時調整處理速度,只能透過釋出程式碼調整引數。
如果執行緒池引數不合理會導致什麼問題呢?下面列舉幾種可能出現的場景:
最大執行緒數設定偏小,工作佇列大小設定偏小,導致服務介面大量丟擲RejectedExecutionException。最大執行緒數設定偏小,工作佇列大小設定過大,任務堆積過度,介面響應時長變長。最大執行緒數設定過大,執行緒排程開銷增大,處理速度反而下降。核心執行緒數設定過小,流量突增時需要先建立執行緒,導致響應時長過大。核心執行緒數設定過大,空閒執行緒太多,佔用系統資源。執行緒池任務排程機制要明白執行緒池引數對執行時的影響,就必須理解其中的原理,所以下面先簡單總結了執行緒池的核心原理。
Java中的執行緒池核心實現類是ThreadPoolExecutor,ThreadPoolExecutor一方面維護自身的生命週期,另一方面同時管理執行緒和任務,使兩者良好的結合從而執行並行任務。使用者無需關注如何建立執行緒,如何排程執行緒來執行任務,使用者只需提供Runnable物件,將任務的執行邏輯提交到執行器(Executor)中,由Executor框架完成執行緒的調配和任務的執行部分。
ThreadPoolExecutor是如何執行,如何同時維護執行緒和執行任務的呢?其執行機制如下圖所示:
所有任務的排程都是由execute方法完成的,這部分完成的工作是:檢查現線上程池的執行狀態、執行執行緒數、執行策略,決定接下來執行的流程,是直接申請執行緒執行,或是緩衝到佇列中執行,亦或是直接拒絕該任務。其執行過程如下:
首先檢測執行緒池執行狀態,如果不是RUNNING,則直接拒絕,執行緒池要保證在RUNNING的狀態下執行任務。如果workerCount < corePoolSize,則建立並啟動一個執行緒來執行新提交的任務。如果workerCount >= corePoolSize,且執行緒池內的阻塞佇列未滿,則將任務新增到該阻塞佇列中。如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且執行緒池內的阻塞佇列已滿,則建立並啟動一個執行緒來執行新提交的任務。如果workerCount >= maximumPoolSize,並且執行緒池內的阻塞佇列已滿, 則根據拒絕策略來處理該任務, 預設的處理方式是直接拋異常。其執行流程如下圖所示:
動態調節執行緒池引數實現執行緒池相關的重要引數有三個,分別是核心執行緒數、最大執行緒數和工作佇列大小,接下來將闡述如何實現動態調節執行緒池引數。
調節核心和最大執行緒數的原理ThreadPoolExecutor已經提供了兩個方法在執行時設定核心執行緒數和最大執行緒數,分別是ThreadPoolExecutor.setCorePoolSize()和ThreadPoolExecutor.setMaximumPoolSize()。
setCorePoolSize方法的執行流程是:首先會覆蓋之前建構函式設定的corePoolSize,然後,如果新的值比原始值要小,當多餘的工作執行緒下次變成空閒狀態的時候會被中斷並銷燬,如果新的值比原來的值要大且工作佇列不為空,則會建立新的工作執行緒。流程圖如下:
setMaximumPoolSize方法執行流程是:首先會覆蓋之前建構函式設定的maximumPoolSize,然後,如果新的值比原來的值要小,當多餘的工作執行緒下次變成空閒狀態的時候會被中斷並銷燬。
調節工作佇列大小的原理執行緒池中是以生產者消費者模式,透過一個阻塞佇列來快取任務,工作執行緒從阻塞佇列中獲取任務。工作佇列的介面是阻塞佇列(BlockingQueue),在佇列為空時,獲取元素的執行緒會等待佇列變為非空,當佇列滿時,儲存元素的執行緒會等待佇列可用。
目前JDK提供了以下阻塞佇列的實現:
但是很不幸,這些阻塞佇列的實現都不支援動態調整大小,那麼為什麼不自己實現一個可動態調整大小的阻塞佇列呢。重複造輪子是不可取的,所以我選擇改造輪子。LinkedBlockingQueue是比較常用的一個阻塞佇列,它無法修改大小的原因是capacity欄位設定成了final private final int capacity;。如果我把final去掉,並提供修改capacity的方法,是不是就滿足我們的需求呢?事實證明是可行的,文章末尾上傳了ResizeLinkedBlockingQueue的實現。
結合Diamond進行實現Diamond可以管理我們的配置,如果可以透過Diamond實現執行緒池引數管理那就再好不過了。接下來就開始上程式碼了,首先實現一個Diamond配置管理類DispatchConfig,然後,實現一個執行緒池管理的工廠方法StreamExecutorFactory。
DispatchConfig類是一個靜態類,在初始化的時候獲取了對應Diamond的內容並設定了監聽,使用的時候只需要DispatchConfig.getConfig().getCorePoolSize()。
/** * @author moda */@Slf4j@Datapublic class DispatchConfig { public static final String DATA_ID = "com.alibaba.mkt.turbo.DispatchConfig"; public static final String GROUP_ID = "mkt-turbo"; private static DispatchConfig config; static { try { String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000); config = JSON.parseObject(content, DispatchConfig.class); Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() { @Override public void receiveConfigInfo(String content) { try { config = JSON.parseObject(content, DispatchConfig.class); } catch (Throwable t) { log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t); } } }); } catch (Exception e) { log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e); } } public static DispatchConfig getConfig() { return config; } private int corePoolSize = 10; private int maximumPoolSize = 30; private int workQueueSize = 1024; /** * 商品分批處理每批大小 */ private int itemBatchProcessPageSize = 200;}
StreamExecutorFactory是一個靜態類,維護了一個靜態屬性executor,並透過initExecutor()進行初始化。在初始化的時候,工作佇列使用了可調節大小的阻塞佇列ResizeLinkedBlockingQueue,並設定了監聽Diamond變更。Diamond發生變更的時候透過在callback中對比值是否發生改變,如果發生改變則調整workQueueSize、corePoolSize、maximumPoolSize。使用的時候只需要StreamExecutorFactory.getExecutor(),修改Diamond配置就能動態修改執行緒池引數。
/** * @author moda */@Slf4jpublic class StreamExecutorFactory { private static final String THREAD_NAME = "mkt-turbo_stream_dispatch"; private static ThreadPoolExecutor executor = initExecutor(); private static ThreadPoolExecutor initExecutor() { ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build(); ResizeLinkedBlockingQueue<Runnable> workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize()); //拒絕策略,呼叫者執行緒處理 RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> { String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)", THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating()); log.warn(msg); if (!e.isShutdown()) { r.run(); } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( DispatchConfig.getConfig().getCorePoolSize(), DispatchConfig.getConfig().getMaximumPoolSize(), 10, TimeUnit.SECONDS, workQueue, nameThreadFactory, rejectedExecutionHandler ); Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() { @Override public void receiveConfigInfo(String content) { try { DispatchConfig config = JSON.parseObject(content, DispatchConfig.class); if (workQueue.getCapacity() != config.getWorkQueueSize()) { workQueue.setCapacity(config.getWorkQueueSize()); } if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) { threadPoolExecutor.setCorePoolSize(config.getCorePoolSize()); } if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) { threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize()); } } catch (Throwable t) { log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t); } } }); return threadPoolExecutor; } public static Executor getExecutor() { return executor; }}