首頁>技術>

怎樣用Java 8優雅的開發業務

[TOC]

流式程式設計基本原理

Java中流式程式設計的基本原理有兩點。

構建流資料流轉(流水線)規約
IntStream.rangeClosed(1, 100) // 1. 構建流    .mapToObj(String::valueOf)// 2. 資料流轉(流水線)    .collect(joining());      // 3. 規約
案例英雄的主位置一共有幾類,分別是什麼
@Testfun t1() {    // 英雄的主位置一共有幾類,分別是什麼    // 對映    val roleMains = heroes.map(Hero::getRoleMain)        // 過濾為空的資料        .filter(Objects::nonNull)        // 去重        .distinct()    println(roleMains.size)    println(roleMains)}
@Testpublic void t1() {    // 英雄的主位置一共有幾類,分別是什麼    List<String> roleMains = heroes.stream()            // 對映            .map(Hero::getRoleMain)            // 過濾為空的資料            .filter(Objects::nonNull)            // 去重            .distinct()            // 收集列表            .collect(toList());    System.out.println(roleMains.size());    System.out.println(roleMains);}
英雄按主次位置分組後,輸出每個分組有多少英雄,其中:近戰英雄有多少位,遠端英雄有多少位
@Testfun t2() {    // 英雄按主次位置分組後,輸出每個分組有多少英雄,其中:近戰英雄有多少位,遠端英雄有多少位    // 主次位置分組的英雄數量    val groupHeroCount = heroes.groupingBy {        Pair.of(it.roleMain, it.roleAssist)    }.eachCount()    // 主次分組後,再按攻擊範圍分組的英雄數量    val groupThenGroupCount = heroes.groupBy {        Pair.of(it.roleMain, it.roleAssist)    }.map {        val value = it.value.groupingBy(Hero::getAttackRange).eachCount()        Pair.of(it.key, value)    }.associateBy({ it.left }, { it.value })    // 遍歷輸出    groupThenGroupCount.forEach { (groupKey, groupValue) ->        val groupingCount = groupHeroCount[groupKey]        print("英雄分組key為:$groupKey;英雄數量:$groupingCount;")        groupValue.forEach { (countKey, countValue) ->            print("英雄攻擊範圍:$countKey;英雄數量:$countValue;")        }        println()    }}
@Testpublic void t2() {    // 英雄按主次位置分組後,輸出每個分組有多少英雄,其中:近戰英雄有多少位,遠端英雄有多少位    // 主次位置分組的英雄數量    Map<Pair<String, String>, Long> groupHeroCount = heroes.stream()            .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()), counting()));    // 主次分組後,再按攻擊範圍分組的英雄數量    Map<Pair<String, String>, Map<String, Long>> groupThenGroupCount = heroes.stream()            .collect(groupingBy(hero -> Pair.of(hero.getRoleMain(), hero.getRoleAssist()),                    groupingBy(Hero::getAttackRange, counting())));    // 遍歷輸出    groupThenGroupCount.forEach((groupKey, groupValue) -> {        Long groupingCount = groupHeroCount.get(groupKey);        System.out.print("英雄分組key為:" + groupKey + ";英雄數量:" + groupingCount + ";");        groupValue.forEach((countKey, countValue) -> System.out.print("英雄攻擊範圍:" + countKey + ";英雄數量:" + countValue + ";"));        System.out.println();    });}
求近戰英雄HP初始值的加總
@Testfun t3() {    // 求近戰英雄HP初始值的加總    val sum = heroes.filter { "近戰" == it.attackRange }        .map(Hero::getHpStart)        .filter(Objects::nonNull)        .reduce(BigDecimal::add)    println("近戰英雄HP初始值的加總為:$sum")}
@Testpublic void t3() {    // 求近戰英雄HP初始值的加總    BigDecimal sum = heroes.stream()            .filter(hero -> "近戰".equals(hero.getAttackRange()))            .map(Hero::getHpStart)            .filter(Objects::nonNull)            .reduce(BigDecimal.ZERO, BigDecimal::add);    System.out.println("近戰英雄HP初始值的加總為:" + sum);}
透過最小列表收集器獲取最小列表
@Testpublic void t4() {    // 透過最小列表收集器獲取最小列表    List<BigDecimal> minAttackGrowth = heroes.stream()            .map(Hero::getAttackGrowth)            .collect(new MinListCollector<>());    System.out.println(minAttackGrowth);    List<Hero> minHero = heroes.stream()            .collect(new MinListCollector<>());    System.out.println(minHero);}
import java.util.*;import java.util.concurrent.atomic.AtomicReference;import java.util.function.BiConsumer;import java.util.function.BinaryOperator;import java.util.function.Function;import java.util.function.Supplier;import java.util.stream.Collector;import java.util.stream.Collectors;import static java.util.stream.Collector.Characteristics.*;/** * 最小列表收集器 * * @author switch * @since 2020/8/18 */public class MinListCollector<T extends Comparable<? super T>> implements Collector<T, List<T>, List<T>> {    /**     * 收集器的特性     *     * @see Characteristics     */    private final static Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH));    private final static int ZERO = 0;    /**     * 最小值     */    private final AtomicReference<T> min = new AtomicReference<>();    @Override    public Supplier<List<T>> supplier() {        // supplier引數用於生成結果容器,容器型別為A        return ArrayList::new;    }    @Override    public BiConsumer<List<T>, T> accumulator() {        // accumulator用於消費元素,也就是歸納元素,這裡的T就是元素,它會將流中的元素一個一個與結果容器A發生操作        return (list, element) -> {            // 獲取最小值            T minValue = min.get();            if (Objects.isNull(minValue)) {                // 第一次比較                list.add(element);                min.set(element);            } else if (element.compareTo(minValue) < ZERO) {                // 發現更小的值                list.clear();                list.add(element);                min.compareAndSet(minValue, element);            } else if (element.compareTo(minValue) == ZERO) {                // 與最小值相等                list.add(element);            }        };    }    @Override    public BinaryOperator<List<T>> combiner() {        // combiner用於兩個兩個合併並行執行的執行緒的執行結果,將其合併為一個最終結果A        return (left, right) -> {            // 最小值列表合併            List<T> leftList = getMinList(left);            List<T> rightList = getMinList(right);            leftList.addAll(rightList);            return leftList;        };    }    private List<T> getMinList(List<T> list) {        return list.stream()                .filter(element -> element.compareTo(min.get()) == ZERO)                .collect(Collectors.toList());    }    @Override    public Function<List<T>, List<T>> finisher() {        // finisher用於將之前整合完的結果R轉換成為A        return Function.identity();    }    @Override    public Set<Characteristics> characteristics() {        // characteristics表示當前Collector的特徵值,這是個不可變Set        return CHARACTERISTICS;    }}
優雅的空處理

import org.junit.Test;import java.util.Optional;/** * @author switch * @since 2020/8/18 */public class OptionalTests {    @Test    public void t1() {        // orElse        System.out.println(Optional.ofNullable(null).orElse("張三"));        System.out.println(Optional.ofNullable(null).orElseGet(() -> "李四"));        System.out.println(Optional.ofNullable("王五").orElseThrow(NullPointerException::new));    }    @Test    public void t2() {        // isPresent        Optional<String> name = Optional.ofNullable("張三");        if (name.isPresent()) {            System.out.println(name.get());        }    }    @Test    public void t3() {        // map        Optional<Integer> number = Optional.of("123456").map(Integer::valueOf);        if (number.isPresent()) {            System.out.println(number.get());        }    }    @Test    public void t4() {        // flatMap        Optional<Integer> number = Optional.of("123456").flatMap(s -> Optional.of(Integer.valueOf(s)));        if (number.isPresent()) {            System.out.println(number.get());        }    }    @Test    public void t5() {        // 過濾        String number = "123456";        String filterNumber = Optional.of(number).filter(s -> !s.equals(number)).orElse("654321");        System.out.println(filterNumber);    }}
新的併發工具類CompletableFuture

單機批處理多執行緒執行模型

該模型適用於百萬級量級的任務。超過千萬資料,可以考慮分組,多機器並行執行。基本流程:

從資料庫獲取Id列表拆分成n個子Id列表透過子Id列表獲取關聯資料(注意:都需要提供批次查詢介面)對映到需要處理的Model(提交到CompletableFuture)->處理資料->收整合list)(java 8流式處理)收集的list進行join操作收集list模型

模型原理:Stream+CompletableFuture+lambda

簡要解釋:

CompletableFuture是java8提供的一個工具類,主要是用於非同步處理流程編排的。Stream是java8提供的一個集合流式處理工具類,主要用於資料的流水線處理。lambda在java中是基於內部匿名類實現的,可以大幅減少重複程式碼。總結:在該模型中Stream用於集合流水線處理、CompletableFuture解決非同步編排問題(非阻塞)、lambda簡化程式碼。資料流動
List<List<String>> -> Stream<List<String>> -> Stream<List<Model>> -> Stream<CompletableFuture<List<Model>>> -> Stream<CompletableFuture<List<對映型別>>> -> List<CompletableFuture<Void>>
案例ThreadPoolUtil
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

public final class ThreadPoolUtil { public static ThreadPoolTaskExecutor getDefaultExecutor(Integer poolSize, Integer maxPoolSize, Integer queueCapacity) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setAllowCoreThreadTimeOut(true); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setCorePoolSize(poolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }}

- `ThreadPoolConfig```` javaimport org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;@Configurationpublic class ThreadPoolConfig {    /**     * 計算規則:N(thread) = N(cpu) * U(cpu) * (1 + w/c)     * N(thread):執行緒池大小     * N(cpu):處理器核數     * U(cpu):期望CPU利用率(該值應該介於0和1之間)     * w/c:是等待時間與計算時間的比率,比如說IO操作即為等待時間,計算處理即為計算時間     */    private static final Integer TASK_POOL_SIZE = 50;    private static final Integer TASK_MAX_POOL_SIZE = 100;    private static final Integer TASK_QUEUE_CAPACITY = 1000;    @Bean("taskExecutor")    public ThreadPoolTaskExecutor taskExecutor() {        return ThreadPoolUtil.getDefaultExecutor(TASK_POOL_SIZE, TASK_MAX_POOL_SIZE, TASK_QUEUE_CAPACITY);    }}

#getFuturesStream

public Stream<CompletableFuture<List<Model>>> getFuturesStream(List<List<String>> idSubLists) {  return idSubLists.stream()      .map(ids ->           CompletableFuture.supplyAsync(() -> modelService.listByIds(ids), taskExecutor)      );}

#standardisation

其中:

$N_{CPU}$是處理器的核的數目,可以透過Runtime.getRuntime().availableProcessors()得到$U_{CPU}$是期望的CPU利用率(該值應該介於0和1之間)$\frac{W}{C}$是等待時間與計算時間的比率,比如說IO操作即為等待時間,計算處理即為計算時間並行——使用流還是CompletableFutures?

對集合進行平行計算有兩種方式:要麼將其轉化為並行流,利用map這樣的操作開展工作,要麼枚舉出集合中的每一個元素,建立新的執行緒,在CompletableFuture內對其進行操作。後者提供了更多的靈活性,可以調整執行緒池的大小,而這能幫助確保整體的計算不會因為執行緒都在等待I/O而發生阻塞。

使用這些API的建議如下:

如果進行的是計算密集型的操作,並且沒有I/O,那麼推薦使用Stream介面,因為實現簡單,同時效率也可能是最高的(如果所有的執行緒都是計算密集型的,那就沒有必要建立比處理器核數更多的執行緒)。反之,如果並行的工作單元還涉及等待I/O的操作(包括網路連線等待),那麼使用CompletableFuture靈活性更好,可以依據等待/計算,或者$\frac{W}{C}$的比率設定需要使用的執行緒數。這種情況不使用並行流的另一個原因是,處理流的流水線中如果發生I/O等待,流的延遲特性很難判斷到底什麼時候觸發了等待。日期和時間API

13
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Day10 鴻蒙,Ability全家桶(二)如何後臺執行任務