首頁>技術>

前言

以前需要非同步執行一個任務時,一般是用Thread或者執行緒池Executor去建立。如果需要返回值,則是呼叫Executor.submit獲取Future。但是多個執行緒存在依賴組合,我們又能怎麼辦?可使用同步元件CountDownLatch、CyclicBarrier等;其實有簡單的方法,就是用CompletableFuture

執行緒任務的建立執行緒任務的序列執行執行緒任務的並行執行處理任務結果和異常多工的簡單組合取消執行執行緒任務任務結果的獲取和完成與否判斷1、建立非同步執行緒任務根據supplier建立CompletableFuture任務
//使用內建執行緒ForkJoinPool.commonPool(),根據supplier構建執行任務public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)//指定自定義執行緒,根據supplier構建執行任務public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
根據runnable建立CompletableFuture任務
//使用內建執行緒ForkJoinPool.commonPool(),根據runnable構建執行任務public static CompletableFuture<Void> runAsync(Runnable runnable)//指定自定義執行緒,根據runnable構建執行任務public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
使用示例
ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> rFuture = CompletableFuture        .runAsync(() -> System.out.println("hello siting"), executor);//supplyAsync的使用CompletableFuture<String> future = CompletableFuture        .supplyAsync(() -> {            System.out.print("hello ");            return "siting";        }, executor);//阻塞等待,runAsync 的future 無返回值,輸出nullSystem.out.println(rFuture.join());//阻塞等待String name = future.join();System.out.println(name);executor.shutdown(); // 執行緒池需要關閉--------輸出結果--------hello sitingnullhello siting
常量值作為CompletableFuture返回
//有時候是需要構建一個常量的CompletableFuturepublic static <U> CompletableFuture<U> completedFuture(U value)
2 、執行緒序列執行任務完成則執行action,不關心上一個任務的結果,無返回值
public CompletableFuture<Void> thenRun(Runnable action)public CompletableFuture<Void> thenRunAsync(Runnable action)public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
使用示例
CompletableFuture<Void> future = CompletableFuture        .supplyAsync(() -> "hello siting", executor)        .thenRunAsync(() -> System.out.println("OK"), executor);executor.shutdown();--------輸出結果--------OK
任務完成則執行action,依賴上一個任務的結果,無返回值
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
使用示例
ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture        .supplyAsync(() -> "hello siting", executor)        .thenAcceptAsync(System.out::println, executor);executor.shutdown();--------輸出結果--------hello siting
任務完成則執行fn,依賴上一個任務的結果,有返回值
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)        public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
使用示例
ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture        .supplyAsync(() -> "hello world", executor)        .thenApplyAsync(data -> {            System.out.println(data); return "OK";        }, executor);System.out.println(future.join());executor.shutdown();--------輸出結果--------hello worldOK
thenCompose - 任務完成則執行fn,依賴上一個任務的結果,有返回值類似thenApply(區別是thenCompose的返回值是CompletionStage,thenApply則是返回 U),提供該方法為了和其他CompletableFuture任務更好地配套組合使用
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,  Executor executor)        
使用示例
//第一個非同步任務,常量任務CompletableFuture<String> f = CompletableFuture.completedFuture("OK");//第二個非同步任務ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture        .supplyAsync(() -> "hello world", executor)        .thenComposeAsync(data -> {            System.out.println(data); return f; //使用第一個任務作為返回        }, executor);System.out.println(future.join());executor.shutdown();--------輸出結果--------hello worldOK
3 、執行緒並行執行兩個CompletableFuture[並行]執行完,然後執行action,不依賴上兩個任務的結果,無返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
使用示例
//第一個非同步任務,常量任務CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture        //第二個非同步任務        .supplyAsync(() -> "hello siting", executor)        // () -> System.out.println("OK") 是第三個任務        .runAfterBothAsync(first, () -> System.out.println("OK"), executor);executor.shutdown();--------輸出結果--------OK
兩個CompletableFuture[並行]執行完,然後執行action,依賴上兩個任務的結果,無返回值
//第一個任務完成再執行other,fn再依賴消費兩個任務的結果,無返回值public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,        BiConsumer<? super T, ? super U> action)//兩個任務非同步完成,fn再依賴消費兩個任務的結果,無返回值     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,        BiConsumer<? super T, ? super U> action)  //兩個任務非同步完成(第二個任務用指定執行緒池執行),fn再依賴消費兩個任務的結果,無返回值                public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,        BiConsumer<? super T, ? super U> action, Executor executor) 
使用示例
//第一個非同步任務,常量任務CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture        //第二個非同步任務        .supplyAsync(() -> "hello siting", executor)        // (w, s) -> System.out.println(s) 是第三個任務        .thenAcceptBothAsync(first, (s, w) -> System.out.println(s), executor);executor.shutdown();--------輸出結果--------hello siting
兩個CompletableFuture[並行]執行完,然後執行action,依賴上兩個任務的結果,有返回值
//第一個任務完成再執行other,fn再依賴消費兩個任務的結果,有返回值public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,   BiFunction<? super T,? super U,? extends V> fn)//兩個任務非同步完成,fn再依賴消費兩個任務的結果,有返回值public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,        BiFunction<? super T,? super U,? extends V> fn)   //兩個任務非同步完成(第二個任務用指定執行緒池執行),fn再依賴消費兩個任務的結果,有返回值        public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,        BiFunction<? super T,? super U,? extends V> fn, Executor executor)         
使用示例
//第一個非同步任務,常量任務CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture        //第二個非同步任務        .supplyAsync(() -> "hello siting", executor)        // (w, s) -> System.out.println(s) 是第三個任務        .thenCombineAsync(first, (s, w) -> {            System.out.println(s);            return "OK";        }, executor);System.out.println(future.join());executor.shutdown();--------輸出結果--------hello sitingOK
4 、執行緒並行執行,誰先執行完則誰觸發下一任務(二者選其最快)上一個任務或者other任務完成, 執行action,不依賴前一任務的結果,無返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)   public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,  Runnable action, Executor executor)
使用示例
//第一個非同步任務,休眠1秒,保證最晚執行晚CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{    try{ Thread.sleep(1000); }catch (Exception e){}    System.out.println("hello world");    return "hello world";});ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture        //第二個非同步任務        .supplyAsync(() ->{            System.out.println("hello siting");            return "hello siting";        } , executor)        //() ->  System.out.println("OK") 是第三個任務        .runAfterEitherAsync(first, () ->  System.out.println("OK") , executor);executor.shutdown();--------輸出結果--------hello sitingOK
上一個任務或者other任務完成, 執行action,依賴最先完成任務的結果,無返回值
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other,  Consumer<? super T> action)public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,  Consumer<? super T> action, Executor executor)       public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,  Consumer<? super T> action, Executor executor)     
使用示例
//第一個非同步任務,休眠1秒,保證最晚執行晚CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{    try{ Thread.sleep(1000);  }catch (Exception e){}    return "hello world";});ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<Void> future = CompletableFuture        //第二個非同步任務        .supplyAsync(() -> "hello siting", executor)        // data ->  System.out.println(data) 是第三個任務        .acceptEitherAsync(first, data ->  System.out.println(data) , executor);executor.shutdown();--------輸出結果--------hello siting        
上一個任務或者other任務完成, 執行fn,依賴最先完成任務的結果,有返回值
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,  Function<? super T, U> fn) public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,  Function<? super T, U> fn)         public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,  Function<? super T, U> fn, Executor executor)         
使用示例
//第一個非同步任務,休眠1秒,保證最晚執行晚CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{    try{ Thread.sleep(1000);  }catch (Exception e){}    return "hello world";});ExecutorService executor = Executors.newSingleThreadExecutor();CompletableFuture<String> future = CompletableFuture        //第二個非同步任務        .supplyAsync(() -> "hello siting", executor)        // data ->  System.out.println(data) 是第三個任務        .applyToEitherAsync(first, data ->  {            System.out.println(data);            return "OK";        } , executor);System.out.println(future);executor.shutdown();--------輸出結果--------hello sitingOK
5 、處理任務結果或者異常exceptionally-處理異常
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
如果之前的處理環節有異常問題,則會觸發exceptionally的呼叫相當於 try...catch使用示例
CompletableFuture<Integer> first = CompletableFuture        .supplyAsync(() -> {            if (true) {                throw new RuntimeException("main error!");            }            return "hello world";        })        .thenApply(data -> 1)        .exceptionally(e -> {            e.printStackTrace(); // 異常捕捉處理,前面兩個處理環節的日常都能捕獲            return 0;        });
handle-任務完成或者異常時執行fn,返回值為fn的返回相比exceptionally而言,即可處理上一環節的異常也可以處理其正常返回值
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,   Executor executor)        
使用示例
CompletableFuture<Integer> first = CompletableFuture        .supplyAsync(() -> {            if (true) { throw new RuntimeException("main error!"); }            return "hello world";        })        .thenApply(data -> 1)        .handleAsync((data,e) -> {            e.printStackTrace(); // 異常捕捉處理            return data;        });System.out.println(first.join());--------輸出結果--------java.util.concurrent.CompletionException: java.lang.RuntimeException: main error! ... 5 morenull
whenComplete-任務完成或者異常時執行action,有返回值whenComplete與handle的區別在於,它不參與返回結果的處理,把它當成監聽器即可即使異常被處理,在CompletableFuture外層,異常也會再次復現使用whenCompleteAsync時,返回結果則需要考慮多執行緒操作問題,畢竟會出現兩個執行緒同時操作一個結果
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,  Executor executor)        
使用示例
CompletableFuture<AtomicBoolean> first = CompletableFuture        .supplyAsync(() -> {            if (true) {  throw new RuntimeException("main error!"); }            return "hello world";        })        .thenApply(data -> new AtomicBoolean(false))        .whenCompleteAsync((data,e) -> {            //異常捕捉處理, 但是異常還是會在外層復現            System.out.println(e.getMessage());        });first.join();--------輸出結果--------java.lang.RuntimeException: main error!Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: main error! ... 5 more
6 、多個任務的簡單組合
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

11
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 智慧合約安全系列文章反彙編·上篇