首頁>科技>

本文導讀:

業務需求場景介紹技術設計方案思考Future 設計模式實戰CompletableFuture 模式實戰CompletableFuture 生產建議CompletableFuture 效能測試CompletableFuture 使用擴充套件

1、業務需求場景介紹

不變的東西就是一直在變化中。

想必,大家在閒暇時刻,會經常看視訊,經常用的幾個 APP,比如優酷、愛奇藝、騰訊等。

這些視訊 APP 不僅僅可以在手機上播放,還能夠支援在電視上播放。

在電視終端上播放的 APP 是獨立釋出的版本,跟手機端的 APP 是不一樣的。

當我們看一部電影時,點選進入某一部電影,就進入到了專輯詳情頁頁面,此時,播放器會自動播放視訊。使用者在手機上看到的專輯詳情頁,與電視上看到的專輯詳情頁,頁面樣式設計上是不同的。

我們來直觀的看一下效果。

手機上的騰訊視訊專輯詳情頁:

相應的,在電視端的專輯詳情頁展示方式是不一樣的。假設產品經理提出一個需求,要求對詳情頁做個改版。

樣式要求如下圖所示:

兩個終端的樣式對比,在電視端專輯詳情頁中,包含了很多板塊,每個板塊橫向展示多個內容。

產品的設計上要求是,有的板塊內容來源於推薦、有的板塊來源於搜尋、有的板塊來源CMS(內容管理系統)。簡單理解為,每個板塊內容來源不同,來源於推薦、搜尋等介面的內容要求是近實時的請求。

2、技術設計方案思考

考慮到產品提的這個需求,其實實現起來並不難。

主要分為了靜態資料部分和動態資料部分,對於不經常變化的資料可以通過靜態介面獲取,對於近乎實時的資料可以通過動態介面獲取。

靜態介面設計:

專輯本身的屬性以及專輯下的視訊資料,一般是不經常變化的。

在需求場景介紹中,我截圖的是電影頻道。如果是電視劇頻道,會展示劇集列表(專輯下的所有視訊,如第 1 集、第 2 集...),而視訊的更新一般是不太頻繁的,所以在專輯詳情頁劇集列表資料就可以從靜態介面獲取。

靜態介面資料生成流程:

另外一部分,就是需要動態介面來實現,呼叫第三方介面獲取資料,比如推薦、搜尋資料。

同時,要求板塊與板塊之間的內容不允許重複。

動態介面設計:

方案一:

序列呼叫,即按照每個板塊的展示先後順序,呼叫相應的第三方介面獲取資料。

方案二:

並行呼叫,即多個板塊之間可以並行呼叫,提高整體介面響應效率。

其實以上兩個方案,各有利弊。

方案一序列呼叫,好處是開發模型簡單,按照序列方式依次呼叫介面,內容資料去重,聚合所有的資料返回給客戶端。

但是,介面響應時間依賴於第三方介面的響應時間,通常第三方介面總是不可靠的,可能就會拉高介面整體響應時間,進而導致佔用執行緒時間過長,影響介面整體吞吐量。

方案二並行呼叫,理論上是可以提高介面的整體響應時間,假設同時呼叫多個第三方介面,取決於最慢的介面響應時間。

並行呼叫時,需要考慮到「池化技術」,即不能無限制的在 JVM 程序上建立過多的執行緒。同時,也要考慮到板塊與板塊之間的內容資料,要按照產品設計上的先後順序做去重。

根據這個需求場景,我們選擇第二種方案來實現更合適一些。

選擇了方案二,我們抽象出如下圖所示的簡易模型:

T1、T2、T3 表示多個板塊內容執行緒。T1 執行緒先返回結果,T2 執行緒返回的結果不能與與 T1 執行緒返回的結果內容重複,T3 執行緒返回的結果不能與 T1、T2 兩個執行緒返回的結果內容重複。

我們從技術實現上考量,當並行呼叫多個第三方介面時,需要獲取介面的返回結果,首先想到的就是 Future ,能夠實現非同步獲取任務結果。

另外,JDK8 提供了 CompletableFuture 易於使用的獲取非同步結果的工具類,解決了 Future 的一些使用上的痛點,以更優雅的方式實現組合式非同步程式設計,同時也契合函數語言程式設計。

3、Future 設計模式實戰

Future 介面設計:

提供了獲取任務結果、取消任務、判斷任務狀態介面。呼叫獲取任務結果方法,在任務未完成情況下,會導致呼叫阻塞。

Future 介面提供的方法:

```

// 獲取任務結果

V get() throws InterruptedException, ExecutionException;

// 支援超時時間的獲取任務結果

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

// 判斷任務是否已完成

boolean isDone();

// 判斷任務是否已取消

boolean isCancelled();

// 取消任務

boolean cancel(boolean mayInterruptIfRunning);

```

通常,我們在考慮到使用 Future 獲取任務結果時,會使用 ThreadPoolExecutor 或者 FutureTask 來實現功能需求。

ThreadPoolExecutor、FutureTask 與 Future 介面關係類圖:

TheadPoolExecutor 提供三個 submit 方法:

// 1. 提交無需返回值的任務,Runnable 介面 run() 方法無返回值public Future<?> submit(Runnable task) {}// 2. 提交需要返回值的任務,Callable 介面 call() 方法有返回值public <T> Future<T> submit(Callable<T> task) {}// 3. 提交需要返回值的任務,任務結果是第二個引數 result 物件public <T> Future<T> submit(Runnable task, T result) {}

第 3 個 submit 方法使用示例如下所示:

static String x = "東昇的思考";public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(1); // 建立 Result 物件 r Result r = new Result(); r.setName(x); // 提交任務 Future<Result> future = executor.submit(new Task(r), r); Result fr = future.get(); // 下面等式成立 System.out.println(fr == r); System.out.println(fr.getName() == x); System.out.println(fr.getNick() == x);}static class Result { private String name; private String nick; // ... ignore getter and setter }static class Task implements Runnable { Result r; // 通過建構函式傳入 result Task(Result r) { this.r = r; } @Override public void run() { // 可以操作 result String name = r.getName(); r.setNick(name); }}

執行結果都是true。

FutureTask 設計實現:

實現了 Runnable 和 Future 兩個介面。實現了 Runnable 介面,說明可以作為任務物件,直接提交給 ThreadPoolExecutor 去執行。實現了 Future 介面,說明能夠獲取執行任務的返回結果。

我們來根據產品的需求,使用 FutureTask 模擬兩個執行緒,通過示例實現下功能。

結合示例程式碼註釋理解:

public static void main(String[] args) throws Exception { // 建立任務 T1 的 FutureTask,呼叫推薦介面獲取資料 FutureTask<String> ft1 = new FutureTask<>(new T1Task()); // 建立任務 T1 的 FutureTask,調用搜索介面獲取資料,依賴 T1 結果 FutureTask<String> ft2 = new FutureTask<>(new T2Task(ft1)); // 執行緒 T1 執行任務 ft1 Thread T1 = new Thread(ft1); T1.start(); // 執行緒 T2 執行任務 ft2 Thread T2 = new Thread(ft2); T2.start(); // 等待執行緒 T2 執行結果 System.out.println(ft2.get());}// T1Task 呼叫推薦介面獲取資料static class T1Task implements Callable<String> { @Override public String call() throws Exception { System.out.println("T1: 呼叫推薦介面獲取資料..."); TimeUnit.SECONDS.sleep(1); System.out.println("T1: 得到推薦介面資料..."); TimeUnit.SECONDS.sleep(10); return " [T1 板塊資料] "; }} // T2Task 調用搜索介面資料,同時需要推薦介面資料static class T2Task implements Callable<String> { FutureTask<String> ft1; // T2 任務需要 T1 任務的 FutureTask 返回結果去重 T2Task(FutureTask<String> ft1) { this.ft1 = ft1; } @Override public String call() throws Exception { System.out.println("T2: 調用搜索介面獲取資料..."); TimeUnit.SECONDS.sleep(1); System.out.println("T2: 得到搜尋介面的資料..."); TimeUnit.SECONDS.sleep(5); // 獲取 T2 執行緒的資料 System.out.println("T2: 呼叫 T1.get() 介面獲取推薦資料"); String tf1 = ft1.get(); System.out.println("T2: 獲取到推薦介面資料:" + tf1); System.out.println("T2: 將 T1 與 T2 板塊資料做去重處理"); return "[T1 和 T2 板塊資料聚合結果]"; }}

執行結果如下:

> Task :FutureTaskTest.main()T1: 呼叫推薦介面獲取資料...T2: 調用搜索介面獲取資料...T1: 得到推薦介面資料...T2: 得到搜尋介面的資料...T2: 呼叫 T1.get() 介面獲取推薦資料T2: 獲取到推薦介面資料: [T1 板塊資料] T2: 將 T1 與 T2 板塊資料做去重處理[T1 和 T2 板塊資料聚合結果]

小結:

Future 表示「未來」的意思,主要是將耗時的一些操作任務,交給單獨的執行緒去執行。從而達到非同步的目的,提交任務的當前執行緒,在提交任務後和獲取任務結果的過程中,當前執行緒可以繼續執行其他操作,不需要在那傻等著返回執行結果。

4、CompleteableFuture 模式實戰

對於 Future 設計模式,雖然我們提交任務時,不會進入任何阻塞,但是當呼叫方要獲得這個任務的執行結果,還是可能會阻塞直至任務執行完成。

在 JDK1.5 設計之初就一直存在這個問題,發展到 JDK1.8 引入了 CompletableFuture 才得到完美的增強。

在此期間,Google 開源的 Guava 工具包提供了 ListenableFuture ,用於支援任務完成時支援回撥方式,感興趣的朋友們可以自行查閱研究。

可以理解為任務與任務之間是有時序關係的,而根據 CompletableFuture 提供的一些功能特性,是非常適合這種業務場景的。

CompletableFuture 類圖:

CompletableFuture 實現了 Future 和 CompletionStage 兩個介面。實現 Future 介面是為了關注非同步任務什麼時候結束,和獲取非同步任務執行的結果。實現 CompletionStage 介面,其提供了非常豐富的功能,實現了序列關係、並行關係、匯聚關係等。

CompletableFuture 核心優勢:

1)無需手工維護執行緒,給任務分配執行緒的工作無需開發人員關注;

2)在使用上,語義更加清晰明確;

例如:t3 = t1.thenCombine(t2, () -> { // doSomething ... } 能夠明確的表述任務 3 要等任務 2 和 任務 1完成後才會開始執行。

3)程式碼更加簡練,支援鏈式呼叫,讓你更專注業務邏輯。

4)方便的處理異常情況

接下來,通過 CompletableFuture 來模擬實現專輯下多板塊資料聚合處理。

程式碼如下所示:

public static void main(String[] args) throws Exception { // 暫存資料 List<String> stashList = Lists.newArrayList(); // 任務 1:呼叫推薦介面獲取資料 CompletableFuture<String> t1 = CompletableFuture.supplyAsync(() -> { System.out.println("T1: 獲取推薦介面資料..."); sleepSeconds(5); stashList.add("[T1 板塊資料]"); return "[T1 板塊資料]"; }); // 任務 2:調用搜索介面獲取資料 CompletableFuture<String> t2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2: 調用搜索介面獲取資料..."); sleepSeconds(3); return " [T2 板塊資料] "; }); // 任務 3:任務 1 和任務 2 完成後執行,聚合結果 CompletableFuture<String> t3 = t1.thenCombine(t2, (t1Result, t2Result) -> { System.out.println(t1Result + " 與 " + t2Result + "實現去重邏輯處理"); return "[T1 和 T2 板塊資料聚合結果]"; }); // 等待任務 3 執行結果 System.out.println(t3.get(6, TimeUnit.SECONDS));}static void sleepSeconds(int timeout) { try { TimeUnit.SECONDS.sleep(timeout); } catch (InterruptedException e) { e.printStackTrace(); }}

執行結果如下:

> Task :CompletableFutureTest.main()T1: 獲取推薦介面資料...T2: 調用搜索介面獲取資料...[T1 板塊資料] 與 [T2 板塊資料] 實現去重邏輯處理[T1 和 T2 板塊資料聚合結果]

上述的示例程式碼在 IDEA 中新建個Class,直接複製進去,即可正常執行。

** 5、CompletableFuture 生產建議**

建立合理的執行緒池:

在生產環境下,不建議直接使用上述示例程式碼形式。因為示例程式碼中使用的

CompletableFuture.supplyAsync(() -> {});

建立 CompletableFuture 物件的 supplyAsync() 方法(這裡使用的工廠方法模式),底層使用的預設執行緒池,不一定能滿足業務需求。

結合底層原始碼來看一下:

// 預設使用 ForkJoinPool 執行緒池private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier);}

建立 ForkJoinPool 執行緒池:

預設執行緒池大小是 Runtime.getRuntime().availableProcessors() - 1(CPU 核數 - 1),可以通過 JVM 引數 -Djava.util.concurrent.ForkJoinPool.common.parallelism 設定執行緒池大小。

JVM 引數上配置 -Djava.util.concurrent.ForkJoinPool.common.threadFactory 設定執行緒工廠類;配置 -Djava.util.concurrent.ForkJoinPool.common.exceptionHandler 設定異常處理類,這兩個引數設定後,內部會通過系統類載入器載入 Class。

如果所有 CompletableFuture 都使用預設執行緒池,一旦有任務執行很慢的 I/O 操作,就會導致所有執行緒都阻塞在 I/O 操作上,進而影響系統整體效能。

所以,建議大家在生產環境使用時,根據不同的業務型別建立不同的執行緒池,以避免互相影響。

CompletableFuture 還提供了另外支援執行緒池的方法。

// 第二個引數支援傳遞 Executor 自定義執行緒池public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier);}

自定義執行緒池,建議參考 「阿里巴巴 Java 開發手冊」,推薦使用 ThreadPoolExecutor 自定義執行緒池,使用有界佇列,根據實際業務情況設定佇列大小。

執行緒池大小的設定,在 「Java 併發程式設計實戰」一書中,Brian Goetz 提供了不少優化建議。如果執行緒池數量過多,競爭 CPU 和記憶體資源,導致大量時間在上下文切換上。反之,如果執行緒池數量過少,無法充分利用 CPU 多核優勢。

執行緒池大小與 CPU 處理器的利用率之比可以用下面公式估算:

異常處理:

CompletableFuture 提供了非常簡單的異常處理 ,如下這些方法,支援鏈式程式設計方式。

// 類似於 try{}catch{} 中的 catch{}public CompletionStage<T> exceptionally (Function<Throwable, ? extends T> fn); // 類似於 try{}finally{} 中的 finally{},不支援返回結果public CompletionStage<T> whenComplete (BiConsumer<? super T, ? super Throwable> action);public CompletionStage<T> whenCompleteAsync (BiConsumer<? super T, ? super Throwable> action); // 類似於 try{}finally{} 中的 finally{},支援返回結果public <U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn);public <U> CompletionStage<U> handleAsync (BiFunction<? super T, Throwable, ? extends U> fn);

#### 6、CompletableFuture 效能測試:

迴圈壓測任務數如下所示,每次執行壓測,從 1 到 jobNum 資料疊加匯聚結果,計算耗時。

統計維度:CompletableFuture 預設執行緒池 與 自定義執行緒池。

效能測試程式碼:

// 效能測試程式碼Arrays.asList(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17, 30, 50, 100, 150, 200, 300).forEach(offset -> { int jobNum = PROCESSORS + offset; System.out.println( String.format("When %s tasks => stream: %s, parallelStream: %s, future default: %s, future custom: %s", testCompletableFutureDefaultExecutor(jobNum), testCompletableFutureCustomExecutor(jobNum)));});// CompletableFuture 使用預設 ForkJoinPool 執行緒池private static long testCompletableFutureDefaultExecutor(int jobCount) { List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start;}// CompletableFuture 使用自定義的執行緒池private static long testCompletableFutureCustomExecutor(int jobCount) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(200, 200, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(100000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("CUSTOM_DAEMON_COMPLETABLEFUTURE"); thread.setDaemon(true); return thread; } }, new ThreadPoolExecutor.CallerRunsPolicy()); List<CompletableFuture<Integer>> tasks = new ArrayList<>(); IntStream.rangeClosed(1, jobCount).forEach(value -> tasks.add(CompletableFuture.supplyAsync(CompleteableFuturePerfTest::getJob, threadPoolExecutor))); long start = System.currentTimeMillis(); int sum = tasks.stream().map(CompletableFuture::join).mapToInt(Integer::intValue).sum(); checkSum(sum, jobCount); return System.currentTimeMillis() - start;}

測試機器配置:8 核CPU,16G記憶體

效能測試結果:

根據壓測結果看到,隨著壓測任務數量越大,使用預設的執行緒池效能越差。

7、CompletableFuture 使用擴充套件:

物件建立:

除前面提到的 supplyAsync 方法外,CompletableFuture 還提供了如下方法:

// 執行任務,CompletableFuture<Void> 無返回值,預設執行緒池public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable);}// 執行任務,CompletableFuture<Void> 無返回值,支援自定義執行緒池public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable);}

我們在 CompletableFuture 模式實戰中,提到了 CompletableFuture 實現了 CompletionStage 介面,該介面提供了非常豐富的功能。

CompletionStage 介面支援序列關係、匯聚 AND 關係、匯聚 OR 關係。

下面對這些關係的介面做個簡單描述,大家在使用時可以去自行查閱 JDK API。

同時,這些關係介面中每個方法都提供了對應的 xxxAsync() 方法,表示非同步化執行任務。

序列關係:

CompletionStage 描述序列關係,主要有 thenApply、thenRun、thenAccept 和 thenCompose 系列介面。

原始碼如下所示:

// 對應 U apply(T t) ,接收引數 T並支援返回值 Upublic <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);// 不接收引數也不支援返回值public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);// 接收引數但不支援返回值public CompletionStage<Void> thenAccept(Consumer<? super T> action);public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);// 組合兩個依賴的 CompletableFuture 物件public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn);

匯聚 AND 關係:

CompletionStage 描述 匯聚 AND 關係,主要有 thenCombine、thenAcceptBoth 和 runAfterBoth 系列介面。

原始碼如下所示(省略了Async 方法):

// 當前和另外的 CompletableFuture 都完成時,兩個引數傳遞給 fn,fn 有返回值public <U,V> CompletionStage<V> thenCombine (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);// 當前和另外的 CompletableFuture 都完成時,兩個引數傳遞給 action,action 沒有返回值public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action);// 當前和另外的 CompletableFuture 都完成時,執行 actionpublic CompletionStage<Void> runAfterBoth(CompletionStage<?> other, Runnable action);

匯聚 OR 關係:

CompletionStage 描述 匯聚 OR 關係,主要有 applyToEither、acceptEither 和 runAfterEither 系列介面。

原始碼如下所示(省略了Async 方法):

// 當前與另外的 CompletableFuture 任何一個執行完成,將其傳遞給 fn,支援返回值public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<? super T, U> fn);// 當前與另外的 CompletableFuture 任何一個執行完成,將其傳遞給 action,不支援返回值public CompletionStage<Void> acceptEither (CompletionStage<? extends T> other, Consumer<? super T> action);// 當前與另外的 CompletableFuture 任何一個執行完成,直接執行 actionpublic CompletionStage<Void> runAfterEither(CompletionStage<?> other, Runnable action);

到此,CompletableFuture 的相關特性都介紹完了。

非同步程式設計慢慢變得越來越成熟,Java 語言官網也開始支援非同步程式設計模式,所以學好非同步程式設計還是有必要的。

本文結合業務需求場景驅動,引出了 Future 設計模式實戰,然後對 JDK1.8 中的 CompletableFuture 是如何使用的,核心優勢、效能測試對比、使用擴充套件方面做了進一步剖析。

希望對大家有所幫助!

最新評論
  • 整治雙十一購物亂象,國家再次出手!該跟這些套路說再見了
  • 華為P30Pro升級EMUI10!你以為體驗就和mate30Pro一樣了?太天真