首頁>技術>

背景

執行緒池是一種基於池化思想管理執行緒的工具,使用執行緒池可以減少建立銷燬執行緒的開銷,避免執行緒過多導致系統資源耗盡。在高併發的任務處理場景,執行緒池的使用是必不可少的。在雙11主圖價格表達專案中為了提升處理效能,很多地方使用到了執行緒池。隨著執行緒池的使用,逐漸發現一個問題,執行緒池的引數如何設定?

執行緒池引數中有三個比較關鍵的引數,分別是corePoolSize(核心執行緒數)、maximumPoolSize(最大執行緒數)、workQueueSzie(工作佇列大小)。根據任務的型別可以區分為IO密集型和CPU密集型,對於CPU密集型,一般經驗是設定corePoolSize=CPU核數+1,對於IO密集型需要根據具體的RT和流量來設定,沒有普適的經驗值。然而,我們一般遇到的情況多數是處理IO密集型任務,如果執行緒池引數不可動態調節,就沒辦法根據實際情況實時調整處理速度,只能透過釋出程式碼調整引數。

如果執行緒池引數不合理會導致什麼問題呢?下面列舉幾種可能出現的場景:

最大執行緒數設定偏小,工作佇列大小設定偏小,導致服務介面大量丟擲RejectedExecutionException。最大執行緒數設定偏小,工作佇列大小設定過大,任務堆積過度,介面響應時長變長。最大執行緒數設定過大,執行緒排程開銷增大,處理速度反而下降。核心執行緒數設定過小,流量突增時需要先建立執行緒,導致響應時長過大。核心執行緒數設定過大,空閒執行緒太多,佔用系統資源。執行緒池任務排程機制

要明白執行緒池引數對執行時的影響,就必須理解其中的原理,所以下面先簡單總結了執行緒池的核心原理。

Java中的執行緒池核心實現類是ThreadPoolExecutor,ThreadPoolExecutor一方面維護自身的生命週期,另一方面同時管理執行緒和任務,使兩者良好的結合從而執行並行任務。使用者無需關注如何建立執行緒,如何排程執行緒來執行任務,使用者只需提供Runnable物件,將任務的執行邏輯提交到執行器(Executor)中,由Executor框架完成執行緒的調配和任務的執行部分。

ThreadPoolExecutor是如何執行,如何同時維護執行緒和執行任務的呢?其執行機制如下圖所示:

所有任務的排程都是由execute方法完成的,這部分完成的工作是:檢查現線上程池的執行狀態、執行執行緒數、執行策略,決定接下來執行的流程,是直接申請執行緒執行,或是緩衝到佇列中執行,亦或是直接拒絕該任務。其執行過程如下:

首先檢測執行緒池執行狀態,如果不是RUNNING,則直接拒絕,執行緒池要保證在RUNNING的狀態下執行任務。如果workerCount < corePoolSize,則建立並啟動一個執行緒來執行新提交的任務。如果workerCount >= corePoolSize,且執行緒池內的阻塞佇列未滿,則將任務新增到該阻塞佇列中。如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且執行緒池內的阻塞佇列已滿,則建立並啟動一個執行緒來執行新提交的任務。如果workerCount >= maximumPoolSize,並且執行緒池內的阻塞佇列已滿, 則根據拒絕策略來處理該任務, 預設的處理方式是直接拋異常。

其執行流程如下圖所示:

動態調節執行緒池引數實現

執行緒池相關的重要引數有三個,分別是核心執行緒數、最大執行緒數和工作佇列大小,接下來將闡述如何實現動態調節執行緒池引數。

調節核心和最大執行緒數的原理

ThreadPoolExecutor已經提供了兩個方法在執行時設定核心執行緒數和最大執行緒數,分別是ThreadPoolExecutor.setCorePoolSize()和ThreadPoolExecutor.setMaximumPoolSize()。

setCorePoolSize方法的執行流程是:首先會覆蓋之前建構函式設定的corePoolSize,然後,如果新的值比原始值要小,當多餘的工作執行緒下次變成空閒狀態的時候會被中斷並銷燬,如果新的值比原來的值要大且工作佇列不為空,則會建立新的工作執行緒。流程圖如下:

setMaximumPoolSize方法執行流程是:首先會覆蓋之前建構函式設定的maximumPoolSize,然後,如果新的值比原來的值要小,當多餘的工作執行緒下次變成空閒狀態的時候會被中斷並銷燬。

調節工作佇列大小的原理

執行緒池中是以生產者消費者模式,透過一個阻塞佇列來快取任務,工作執行緒從阻塞佇列中獲取任務。工作佇列的介面是阻塞佇列(BlockingQueue),在佇列為空時,獲取元素的執行緒會等待佇列變為非空,當佇列滿時,儲存元素的執行緒會等待佇列可用。

目前JDK提供了以下阻塞佇列的實現:

但是很不幸,這些阻塞佇列的實現都不支援動態調整大小,那麼為什麼不自己實現一個可動態調整大小的阻塞佇列呢。重複造輪子是不可取的,所以我選擇改造輪子。LinkedBlockingQueue是比較常用的一個阻塞佇列,它無法修改大小的原因是capacity欄位設定成了final private final int capacity;。如果我把final去掉,並提供修改capacity的方法,是不是就滿足我們的需求呢?事實證明是可行的,文章末尾上傳了ResizeLinkedBlockingQueue的實現。

結合Diamond進行實現

Diamond可以管理我們的配置,如果可以透過Diamond實現執行緒池引數管理那就再好不過了。接下來就開始上程式碼了,首先實現一個Diamond配置管理類DispatchConfig,然後,實現一個執行緒池管理的工廠方法StreamExecutorFactory。

DispatchConfig類是一個靜態類,在初始化的時候獲取了對應Diamond的內容並設定了監聽,使用的時候只需要DispatchConfig.getConfig().getCorePoolSize()。

/** * @author moda */@Slf4j@Datapublic class DispatchConfig {    public static final String DATA_ID = "com.alibaba.mkt.turbo.DispatchConfig";    public static final String GROUP_ID = "mkt-turbo";    private static DispatchConfig config;    static {        try {            String content = Diamond.getConfig(DATA_ID, GROUP_ID, 3000);            config = JSON.parseObject(content, DispatchConfig.class);            Diamond.addListener(DATA_ID, GROUP_ID, new ManagerListenerAdapter() {                @Override                public void receiveConfigInfo(String content) {                    try {                        config = JSON.parseObject(content, DispatchConfig.class);                    } catch (Throwable t) {                        log.error("[DispatchConfig] receiveConfigInfo an exception occurs,", t);                    }                }            });        } catch (Exception e) {            log.error(String.format("[DispatchConfig - init] dataId:%s, groupId:%s ", DATA_ID, GROUP_ID), e);        }    }    public static DispatchConfig getConfig() {        return config;    }    private int corePoolSize = 10;    private int maximumPoolSize = 30;    private int workQueueSize = 1024;    /**     * 商品分批處理每批大小     */    private int itemBatchProcessPageSize = 200;}

StreamExecutorFactory是一個靜態類,維護了一個靜態屬性executor,並透過initExecutor()進行初始化。在初始化的時候,工作佇列使用了可調節大小的阻塞佇列ResizeLinkedBlockingQueue,並設定了監聽Diamond變更。Diamond發生變更的時候透過在callback中對比值是否發生改變,如果發生改變則調整workQueueSize、corePoolSize、maximumPoolSize。使用的時候只需要StreamExecutorFactory.getExecutor(),修改Diamond配置就能動態修改執行緒池引數。

/** * @author moda */@Slf4jpublic class StreamExecutorFactory {    private static final String THREAD_NAME = "mkt-turbo_stream_dispatch";    private static ThreadPoolExecutor executor = initExecutor();    private static ThreadPoolExecutor initExecutor() {        ThreadFactory nameThreadFactory = new ThreadFactoryBuilder().setNameFormat(THREAD_NAME).build();        ResizeLinkedBlockingQueue<Runnable> workQueue = new ResizeLinkedBlockingQueue<>(DispatchConfig.getConfig().getWorkQueueSize());        //拒絕策略,呼叫者執行緒處理        RejectedExecutionHandler rejectedExecutionHandler = (r, e) -> {            String msg = String.format("[S.E.F - rejectedHandler] Thread pool is EXHAUSTED!" +                    " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +                    " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)",                THREAD_NAME, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating());            log.warn(msg);            if (!e.isShutdown()) {                r.run();            }        };        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(            DispatchConfig.getConfig().getCorePoolSize(),            DispatchConfig.getConfig().getMaximumPoolSize(),            10,            TimeUnit.SECONDS,            workQueue,            nameThreadFactory,            rejectedExecutionHandler        );        Diamond.addListener(DispatchConfig.DATA_ID, DispatchConfig.GROUP_ID, new ManagerListenerAdapter() {            @Override            public void receiveConfigInfo(String content) {                try {                    DispatchConfig config = JSON.parseObject(content, DispatchConfig.class);                    if (workQueue.getCapacity() != config.getWorkQueueSize()) {                        workQueue.setCapacity(config.getWorkQueueSize());                    }                    if (threadPoolExecutor.getCorePoolSize() != config.getCorePoolSize()) {                        threadPoolExecutor.setCorePoolSize(config.getCorePoolSize());                    }                    if (threadPoolExecutor.getMaximumPoolSize() != config.getMaximumPoolSize()) {                        threadPoolExecutor.setMaximumPoolSize(config.getMaximumPoolSize());                    }                } catch (Throwable t) {                    log.error("[S.E.F-receiveConfigInfo] an exception occurs,", t);                }            }        });        return threadPoolExecutor;    }    public static Executor getExecutor() {        return executor;    }}
30
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 簡單易懂:為什麼我們要搞分散式系統?