首頁>技術>

前言

前面我們介紹了執行緒池框架(ExecutorService)的兩個具體實現:

任務性質型別CPU密集型(CPU bound)

CPU密集型也叫計算密集型,指的是系統的硬碟、記憶體效能相對於CPU要好很好多,此時,系統運作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬碟/記憶體),I/O在很短的時間就可以完成,而CPU還有許多運算要處理,CPU Loading很高。

在多重程式系統中,大部分時間用來做計算、邏輯判斷等CPU動作的程式稱之 CPU bound。例如一個計算圓周率至小數點一千位以下的程式,在執行的過程當中絕大部分時間在用三角函式和開根號的計算,便是屬於CPU bound的程式。

CPU bound的程式一般而言CPU佔用率相當高。這可能是因為任務本身不太需要訪問I/O裝置,也可能是因為程式是多執行緒實現因此遮蔽了等待I/O的時間。

執行緒數一般設定為:執行緒數 = CPU核數 + 1(現代CPU支援超執行緒)

IO密集型(I/O bound)

I/O密集型指的是系統的CPU效能相對硬碟、記憶體要好很多,此時,系統運作,大部分的狀況是 CPU 在等 I/O(硬碟/記憶體)的讀/寫操作,此時 CPU Loading 並不高。

I/O bound的程式一般在達到效能極限時,CPU佔用率仍然較低。這可能是因為任務本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。

執行緒數一般設定為:執行緒數 = ((執行緒等待時間 + 執行緒CPU時間) / 執行緒CPU時間) * CPU數目

CPU密集型 VS I/O密集型

我們可以把任務分為計算密集型和I/O密集型

計算密集型任務的特點是要進行大量的計算,消耗CPU資源,比如計算圓周率、對影片進行高畫質解碼等等,全靠CPU的運算能力。這種計算密集型任務雖然也可以用多工完成,但是任務越多,花在任務切換的時間就越多,CPU執行任務的效率就越低,所以,要最高效地利用CPU,計算密集型任務同時進行的數量應當等於CPU的核心數。

計算密集型任務由於主要消耗CPU資源,因此,程式碼執行效率至關重要。Python這樣的指令碼語言執行效率很低,完全不適合計算密集型任務。對於計算密集型任務,最好用C語言編寫。

第二種任務的型別是I/O密集型,涉及到網路、磁碟I/O的任務都是I/O密集型任務,這類任務的特點是CPU消耗很少,任務的大部分時間都在等待I/O操作完成(因為I/O的速度遠遠低於CPU和記憶體的速度)。對於I/O密集型任務,任務越多,CPU效率越高,但也有一個限度。常見的大部分任務都是I/O密集型任務,比如Web應用。

I/O密集型任務執行期間,99%的時間都花在I/O上,花在CPU上的時間很少,因此,用執行速度極快的C語言替換用Python這樣執行速度極低的指令碼語言,完全無法提升執行效率。對於I/O密集型任務,最合適的語言就是開發效率最高(程式碼量最少)的語言,指令碼語言是首選,C語言最差。

什麼是 Fork/Join 框架?

Fork/Join 框架是 Java7 提供了的一個用於並行執行的任務的框架,是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架。

Fork 就是把一個大任務切分為若干個子任務並行的執行,Join 就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算 1+2+......+10000,可以分割成10個子任務,每個子任務對1000個數進行求和,最終彙總這10個子任務的結果。如下圖所示:

Fork/Join的特性

工作竊取演算法

工作竊取(work-stealing)演算法 是指某個執行緒從其他佇列裡竊取任務來執行。

我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,於是把這些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應,比如A執行緒負責處理A佇列裡的任務。

但是有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有任務等待處理。幹完活的執行緒與其等著,不如去幫其他執行緒幹活,於是它就去其他執行緒的佇列裡竊取一個任務來執行。而在這時它們會訪問同一個佇列,所以為了減少竊取任務執行緒和被竊取任務執行緒之間的競爭,通常會使用雙端佇列被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。

工作竊取演算法的優點是充分利用執行緒進行平行計算,並減少了執行緒間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。並且消耗了更多的系統資源,比如建立多個執行緒和多個雙端佇列。

ForkJoinPool 的每個工作執行緒都維護著一個工作佇列(WorkQueue),這是一個雙端佇列(Deque),裡面存放的物件是任務(ForkJoinTask)。每個工作執行緒在執行中產生新的任務(通常是因為呼叫了 fork())時,會放入工作佇列的隊尾,並且工作執行緒在處理自己的工作佇列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務來執行。每個工作執行緒在處理自己的工作佇列同時,會嘗試竊取一個任務(或是來自於剛剛提交到 pool 的任務,或是來自於其他工作執行緒的工作佇列),竊取的任務位於其他執行緒的工作佇列的隊首,也就是說工作執行緒在竊取其他工作執行緒的任務時,使用的是 FIFO 方式。在遇到 join() 時,如果需要 join 的任務尚未完成,則會先處理其他任務,並等待其完成。在既沒有自己的任務,也沒有可以竊取的任務時,進入休眠。Fork/Join的使用

使用場景示例

定義fork/join任務,如下示例,隨機生成2000w條資料在陣列當中,然後求和_

package com.niuh.forkjoin.recursivetask;import java.util.concurrent.RecursiveTask;/** * RecursiveTask 平行計算,同步有返回值 * ForkJoin框架處理的任務基本都能使用遞迴處理,比如求斐波那契數列等,但遞迴演算法的缺陷是: * 一隻會只用單執行緒處理, * 二是遞迴次數過多時會導致堆疊溢位; * ForkJoin解決了這兩個問題,使用多執行緒併發處理,充分利用計算資源來提高效率,同時避免堆疊溢位發生。 * 當然像求斐波那契數列這種小問題直接使用線性演算法搞定可能更簡單,實際應用中完全沒必要使用ForkJoin框架, * 所以ForkJoin是核彈,是用來對付大傢伙的,比如超大陣列排序。 * 最佳應用場景:多核、多記憶體、可以分割計算再合併的計算密集型任務 */class LongSum extends RecursiveTask<Long> {    //任務拆分的最小閥值    static final int SEQUENTIAL_THRESHOLD = 1000;    static final long NPS = (1000L * 1000 * 1000);    static final boolean extraWork = true; // change to add more than just a sum    int low;    int high;    int[] array;    LongSum(int[] arr, int lo, int hi) {        array = arr;        low = lo;        high = hi;    }    /**     * fork()方法:將任務放入佇列並安排非同步執行,一個任務應該只調用一次fork()函式,除非已經執行完畢並重新初始化。     * tryUnfork()方法:嘗試把任務從佇列中拿出單獨處理,但不一定成功。     * join()方法:等待計算完成並返回計算結果。     * isCompletedAbnormally()方法:用於判斷任務計算是否發生異常。     */    protected Long compute() {        if (high - low <= SEQUENTIAL_THRESHOLD) {            long sum = 0;            for (int i = low; i < high; ++i) {                sum += array[i];            }            return sum;        } else {            int mid = low + (high - low) / 2;            LongSum left = new LongSum(array, low, mid);            LongSum right = new LongSum(array, mid, high);            left.fork();            right.fork();            long rightAns = right.join();            long leftAns = left.join();            return leftAns + rightAns;        }    }}

執行fork/join任務

package com.niuh.forkjoin.recursivetask;import com.niuh.forkjoin.utils.Utils;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;public class LongSumMain {    //獲取邏輯處理器數量    static final int NCPU = Runtime.getRuntime().availableProcessors();    /**     * for time conversion     */    static final long NPS = (1000L * 1000 * 1000);    static long calcSum;    static final boolean reportSteals = true;    public static void main(String[] args) throws Exception {        int[] array = Utils.buildRandomIntArray(2000000);        System.out.println("cpu-num:" + NCPU);        //單執行緒下計算陣列資料總和        long start = System.currentTimeMillis();        calcSum = seqSum(array);        System.out.println("seq sum=" + calcSum);        System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));        start = System.currentTimeMillis();        //採用fork/join方式將陣列求和任務進行拆分執行,最後合併結果        LongSum ls = new LongSum(array, 0, array.length);        ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的執行緒數        ForkJoinTask<Long> task = fjp.submit(ls);        System.out.println("forkjoin sum=" + task.get());        System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));        if (task.isCompletedAbnormally()) {            System.out.println(task.getException());        }        fjp.shutdown();    }    static long seqSum(int[] array) {        long sum = 0;        for (int i = 0; i < array.length; ++i) {            sum += array[i];        }        return sum;    }}
Fork/Join框架原理

Fork/Join 其實就是指由ForkJoinPool作為執行緒池、ForkJoinTask(通常實現其三個抽象子類)為任務、ForkJoinWorkerThread作為執行任務的具體執行緒實體這三者構成的任務排程機制。

ForkJoinWorkerThread

ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,並沒有對執行緒的排程執行做任何更改。

ForkJoinWorkerThread 是被ForkJoinPool管理的工作執行緒,在創建出來之後都被設定成為了守護執行緒,由它來執行ForkJoinTasks。該類主要為了維護建立執行緒例項時透過ForkJoinPool為其建立的任務佇列,與其他兩個執行緒池整個執行緒池只有一個任務佇列不同,ForkJoinPool管理的所有工作執行緒都擁有自己的工作佇列,為了實現任務竊取機制,該佇列被設計成一個雙端佇列,而ForkJoinWorkerThread的首要任務就是執行自己的這個雙端任務佇列中的任務,其次是竊取其他執行緒的工作佇列,以下是其程式碼片段:

public class ForkJoinWorkerThread extends Thread { // 這個執行緒工作的ForkJoinPool池    final ForkJoinPool pool;        // 這個執行緒擁有的工作竊取機制的工作佇列    final ForkJoinPool.WorkQueue workQueue;     //建立在給定ForkJoinPool池中執行的ForkJoinWorkerThread。    protected ForkJoinWorkerThread(ForkJoinPool pool) {        // Use a placeholder until a useful name can be set in registerWorker        super("aForkJoinWorkerThread");        this.pool = pool;        //向ForkJoinPool執行池註冊當前工作執行緒,ForkJoinPool為其分配一個工作佇列        this.workQueue = pool.registerWorker(this);     }    //該工作執行緒的執行內容就是執行工作佇列中的任務    public void run() {        if (workQueue.array == null) { // only run once            Throwable exception = null;            try {                onStart();                pool.runWorker(workQueue); //執行工作佇列中的任務            } catch (Throwable ex) {                exception = ex; //記錄異常            } finally {                try {                    onTermination(exception);                } catch (Throwable ex) {                    if (exception == null)                        exception = ex;                } finally {                    pool.deregisterWorker(this, exception); //撤銷工作                }            }        }    }    .....}
ForkJoinTask

ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個抽象類。

ForkJoinTask :我們要使用 ForkJoin 框架,必須首先建立一個 ForkJoin 任務。它提供在任務中執行 fork()join() 操作的機制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,Fork/Join框架提供類以下幾個子類:

RecursiveAction:用於沒有返回結果的任務。(比如寫資料到磁碟,然後就退出。一個 RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨立的執行緒或者 CPU 執行。我們可以透過繼承來實現一個 RecusiveAction)RescursiveTask:用於有返回結果的任務。(可以將自己的工作分割為若干更小任務,並將這些子任務的執行合併到一個集體結果。可以有幾個水平的分割和合並)CountedCompleter :在任務完成執行後會觸發執行一個自定義的鉤子函式。常量介紹

ForkJoinTask 有一個int型別的status欄位:

其高16位儲存任務執行狀態例如NORMAL、CANCELLED或EXCEPTIONAL低16位預留用於使用者自定義的標記。

任務未完成之前status大於等於0,完成之後就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小於0的值,這幾個值也是按大小順序的:0(初始狀態) > NORMAL > CANCELLED > EXCEPTIONAL.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {    /** 該任務的執行狀態 */    volatile int status; // accessed directly by pool and workers    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits    static final int NORMAL      = 0xf0000000;  // must be negative    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16    static final int SMASK       = 0x0000ffff;  // short bits for tags    // 異常雜湊表    //被任務丟擲的異常陣列,為了報告給呼叫者。因為異常很少見,所以我們不直接將它們儲存在task物件中,而是使用弱引用陣列。注意,取消異常不會出現在陣列,而是記錄在statue欄位中    //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。    private static final ExceptionNode[] exceptionTable;        //異常雜湊連結串列陣列    private static final ReentrantLock exceptionTableLock;    private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之後,相應的異常節點物件的引用佇列    /**    * 固定容量的exceptionTable.    */    private static final int EXCEPTION_MAP_CAPACITY = 32;    //異常陣列的鍵值對節點。    //該雜湊連結串列陣列使用執行緒id進行比較,該陣列具有固定的容量,因為它只維護任務異常足夠長,以便參與者訪問它們,所以在持續的時間內不應該變得非常大。但是,由於我們不知道最後一個joiner何時完成,我們必須使用弱引用並刪除它們。我們對每個操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些執行緒在其池變為isQuiescent時都會呼叫helpExpungeStaleExceptions    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {        final Throwable ex;        ExceptionNode next;        final long thrower;  // 丟擲異常的執行緒id        final int hashCode;  // 在弱引用消失之前儲存hashCode        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {            super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之後,會將該節點加入佇列exceptionTableRefQueue            this.ex = ex;            this.next = next;            this.thrower = Thread.currentThread().getId();            this.hashCode = System.identityHashCode(task);        }    }    .................}

除了status記錄任務的執行狀態之外,其他欄位主要是為了對任務執行的異常的處理,ForkJoinTask採用了雜湊陣列 + 連結串列的資料結構(JDK8以前的HashMap實現方法)存放所有(因為這些欄位是static)的ForkJoinTask任務的執行異常。

fork 方法(安排任務非同步執行)

fork() 做的工作只有一件事,既是把任務推入當前工作執行緒的工作佇列裡(安排任務非同步執行)。可以參看以下的原始碼:

public final ForkJoinTask<V> fork() {    Thread t;    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)        ((ForkJoinWorkerThread)t).workQueue.push(this);    else        ForkJoinPool.common.externalPush(this);    return this;}

該方法其實就是將任務透過push方法加入到當前工作執行緒的工作佇列或者提交佇列(外部非ForkJoinWorkerThread執行緒透過submit、execute方法提交的任務),等待被執行緒池排程執行,這是一個非阻塞的立即返回方法。

這裡需要知道,ForkJoinPool執行緒池透過雜湊陣列+雙端佇列的方式將所有的工作執行緒擁有的任務佇列和從外部提交的任務分別對映到雜湊陣列的不同槽位上。

join 方法(等待執行結果)

join() 的工作則複雜得多,也是 join() 可以使得執行緒免於被阻塞的原因——不像同名的 Thread.join()。

檢查呼叫 join() 的執行緒是否是 ForkJoinThread 執行緒。如果不是(例如 main 執行緒),則阻塞當前執行緒,等待任務完成。如果是,則不阻塞。檢視任務的完成狀態,如果已經完成,直接返回結果。如果任務尚未完成,但處於自己的工作佇列內,則完成它。如果任務已經被其他的工作執行緒偷走,則竊取這個小偷的工作佇列內的任務(以 FIFO 方式),執行,以期幫助它早日完成 join 的任務。如果偷走任務的小偷也已經把自己的任務全部做完,正在等待需要 join 的任務時,則找到小偷的小偷,幫助它完成它的任務。遞迴地執行第5步。

將上述流程畫成序列圖的話就是這個樣子:

小結

通常ForkJoinTask只適用於非迴圈依賴的純函式的計算或孤立物件的操作,否則,執行可能會遇到某種形式的死鎖,因為任務迴圈地等待彼此。但是,這個框架支援其他方法和技術(例如使用Phaser、helpQuiesce和complete),這些方法和技術可用於構造解決這種依賴任務的ForkJoinTask子類,為了支援這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標記一個short型別的值,並使用getForkJoinTaskTag進行檢查。ForkJoinTask實現沒有將這些受保護的方法或標記用於任何目的,但是它們可以用於構造專門的子類,由此可以使用提供的方法來避免重新訪問已經處理過的節點/任務。

ForkJoinTask應該執行相對較少的計算,並且應該避免不確定的迴圈。大任務應該被分解成更小的子任務,通常透過遞迴分解。如果任務太大,那麼並行性就不能提高吞吐量。如果太小,那麼記憶體和內部任務維護開銷可能會超過處理開銷。

ForkJoinTask是可序列化的,這使它們能夠在諸如遠端執行框架之類的擴充套件中使用。只在執行之前或之後序列化任務才是明智的,而不是在執行期間。

ForkJoinPool

ForkJoinPool:ForkJoinTask 需要透過 ForkJoinPool 來執行,任務分割出的子任務會新增到當前工作執行緒所維護的雙端佇列中,進入佇列的頭部。當一個工作執行緒的佇列裡暫時沒有任務時,它會隨機從其他工作執行緒的佇列的尾部獲取一個任務。

常量介紹

ForkJoinPool 與 內部類 WorkQueue 共享的一些常量

// Constants shared across ForkJoinPool and WorkQueue// 限定引數static final int SMASK = 0xffff;        //  低位掩碼,也是最大索引位static final int MAX_CAP = 0x7fff;      //  工作執行緒最大容量static final int EVENMASK = 0xfffe;     //  偶數低位掩碼static final int SQMASK = 0x007e;       //  workQueues 陣列最多64個槽位// ctl 子域和 WorkQueue.scanState 的掩碼和標誌位static final int SCANNING = 1;          // 標記是否正在執行任務static final int INACTIVE = 1 << 31;    // 失活狀態  負數static final int SS_SEQ = 1 << 16;      // 版本戳,防止ABA問題// ForkJoinPool.config 和 WorkQueue.config 的配置資訊標記static final int MODE_MASK = 0xffff << 16;  // 模式掩碼static final int LIFO_QUEUE = 0;    // LIFO佇列static final int FIFO_QUEUE = 1 << 16;  // FIFO佇列static final int SHARED_QUEUE = 1 << 31;    // 共享模式佇列,負數 ForkJoinPool 中的相關常量和例項欄位:

ForkJoinPool 中的相關常量和例項欄位

// 低位和高位掩碼private static final long SP_MASK = 0xffffffffL;private static final long UC_MASK = ~SP_MASK;// 活躍執行緒數private static final int AC_SHIFT = 48;private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍執行緒數增量private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍執行緒數掩碼// 工作執行緒數private static final int TC_SHIFT = 32;private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作執行緒數增量private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 建立工作執行緒標誌// 池狀態private static final int RSLOCK = 1;private static final int RSIGNAL = 1 << 1;private static final int STARTED = 1 << 2;private static final int STOP = 1 << 29;private static final int TERMINATED = 1 << 30;private static final int SHUTDOWN = 1 << 31;// 例項欄位volatile long ctl;                   // 主控制引數volatile int runState;               // 執行狀態鎖final int config;                    // 並行度|模式int indexSeed;                       // 用於生成工作執行緒索引volatile WorkQueue[] workQueues;     // 主物件註冊資訊,workQueuefinal ForkJoinWorkerThreadFactory factory;// 執行緒工廠final UncaughtExceptionHandler ueh;  // 每個工作執行緒的異常資訊final String workerNamePrefix;       // 用於建立工作執行緒的名稱volatile AtomicLong stealCounter;    // 偷取任務總數,也可作為同步監視器/** 靜態初始化欄位 *///執行緒工廠public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;//啟動或殺死執行緒的方法呼叫者的許可權private static final RuntimePermission modifyThreadPermission;// 公共靜態poolstatic final ForkJoinPool common;//並行度,對應內部common池static final int commonParallelism;//備用執行緒數,在tryCompensate中使用private static int commonMaxSpares;//建立workerNamePrefix(工作執行緒名稱字首)時的序號private static int poolNumberSequence;//執行緒阻塞等待新的任務的超時值(以納秒為單位),預設2秒private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec//空閒超時時間,防止timer未命中private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms//預設備用執行緒數private static final int DEFAULT_COMMON_MAX_SPARES = 256;//阻塞前自旋的次數,用在在awaitRunStateLock和awaitWork中private static final int SPINS  = 0;//indexSeed的增量private static final int SEED_INCREMENT = 0x9e3779b9;

ForkJoinPool 的內部狀態都是透過一個64位的 long 變數ctl來儲存,它由四個16位的子域組成:

AC: 正在執行工作執行緒數減去目標並行度,高16位TC: 總工作執行緒數減去目標並行度,中高16位SS: 棧頂等待執行緒的版本計數和狀態,中低16位ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位

ForkJoinPool.WorkQueue 中的相關屬性:

//初始佇列容量,2的冪static final int INITIAL_QUEUE_CAPACITY = 1 << 13;//最大佇列容量static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M// 例項欄位volatile int scanState;    // Woker狀態, <0: inactive; odd:scanningint stackPred;             // 記錄前一個棧頂的ctlint nsteals;               // 偷取任務數int hint;                  // 記錄偷取者索引,初始為隨機索引int config;                // 池索引和模式volatile int qlock;        // 1: locked, < 0: terminate; else 0volatile int base;         // 下一個poll操作的索引(棧底/佇列頭)int top;                   // 一個push操作的索引(棧頂/佇列尾)ForkJoinTask<?>[] array;   // 任務陣列final ForkJoinPool pool;   // the containing pool (may be null)final ForkJoinWorkerThread owner; // 當前工作佇列的工作執行緒,共享模式下為nullvolatile Thread parker;    // 呼叫park阻塞期間為owner,其他情況為nullvolatile ForkJoinTask<?> currentJoin;  // 記錄被join過來的任務volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作佇列偷取過來的任務
內部資料結構

ForkJoinPool採用了雜湊陣列 + 雙端佇列的方式存放任務,但這裡的任務分為兩類:

一類是透過execute、submit 提交的外部任務另一類是ForkJoinWorkerThread工作執行緒透過fork/join分解出來的工作任務

ForkJoinPool並沒有把這兩種任務混在一個任務佇列中,對於外部任務,會利用Thread內部的隨機probe值對映到雜湊陣列的偶數槽位中的提交佇列中,這種提交佇列是一種陣列實現的雙端佇列稱之為Submission Queue,專門存放外部提交的任務。

對於ForkJoinWorkerThread工作執行緒,每一個工作執行緒都分配了一個工作佇列,這也是一個雙端佇列,稱之為Work Queue,這種佇列都會被對映到雜湊陣列的奇數槽位,每一個工作執行緒fork/join分解的任務都會被新增到自己擁有的那個工作佇列中。

在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的雜湊陣列,其元素就是內部類WorkQueue實現的基於陣列的雙端佇列。該雜湊陣列的長度為2的冪,並且支援擴容。如下就是該雜湊陣列的示意結構圖:

如圖,提交佇列位於雜湊陣列workQueue的奇數索引槽位,工作執行緒的工作佇列位於偶數槽位。

預設情況下,asyncMode為false時:因此工作執行緒把工作隊列當著棧一樣使用(後進先出),將分解的子任務推入工作佇列的top端,取任務的時候也從top端取(凡是雙端佇列都會有兩個分別指向佇列兩端的指標,這裡就是圖上畫出的base和top);而當某些工作執行緒的任務為空的時候,就會從其他佇列(不限於workQueue,也會是提交佇列)竊取(steal)任務,如圖示擁有workQueue2的工作執行緒從workQueue1中竊取了一個任務,竊取任務的時候採用的是先進先出FIFO的策略(即從base端竊取任務),這樣不但可以避免在取任務的時候與擁有其佇列的工作執行緒發生衝突,從而減小競爭,還可以輔助其完成比較大的任務。asyncMode為true的話,擁有該工作佇列的工作執行緒將按照先進先出的策略從base端取任務,這一般只用於不需要返回結果的任務,或者事件訊息傳遞框架。ForkJoinPool建構函式

其完整構造方法如下

private ForkJoinPool(int parallelism,                     ForkJoinWorkerThreadFactory factory,                     UncaughtExceptionHandler handler,                     int mode,                     String workerNamePrefix) {    this.workerNamePrefix = workerNamePrefix;    this.factory = factory;    this.ueh = handler;    this.config = (parallelism & SMASK) | mode;    long np = (long)(-parallelism); // offset ctl counts    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}

重要引數解釋

parallelism:並行度( the parallelism level),預設情況下跟我們機器的cpu個數保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機器執行時可用的CPU個數。factory:建立新執行緒的工廠( the factory for creating new threads)。預設情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。handler:執行緒異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器線上程執行任務時由於某些無法預料到的錯誤而導致任務執行緒中斷時進行一些處理,預設情況為null。asyncMode:這個引數要注意,在ForkJoinPool中,每一個工作執行緒都有一個獨立的任務佇列

asyncMode表示工作執行緒內的任務佇列是採用何種方式進行排程,可以是先進先出FIFO,也可以是後進先出LIFO。如果為true,則執行緒池中的工作執行緒則使用先進先出方式進行任務排程,預設情況下是false。

ForkJoinPool.submit 方法
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {    if (task == null)        throw new NullPointerException();    //提交到工作佇列    externalPush(task);    return task;}

ForkJoinPool 自身擁有工作佇列,這些工作佇列的作用是用來接收由外部執行緒(非 ForkJoinThread 執行緒)提交過來的任務,而這些工作佇列被稱為 submitting queue 。 submit() 和 fork() 其實沒有本質區別,只是提交物件變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作執行緒”竊取“的物件,因此當其中的任務被一個工作執行緒成功竊取時,就意味著提交的任務真正開始進入執行階段。

https://github.com/Niuh-Study/niuh-juc-final.git

17
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Hadoop實時分析工具Pig—2 查詢應用PigLatin