一页书

你永远不会过度准备

Stream流

什么是Stream流

Stream是流式思想,相当于工厂的流水线,对集合中的数据进行加工处理

Stream和IO流(InputStream/OutputStream)没有任何关系!

获取Stream流的两种方式

  • 所有的 Collection 集合都可以通过 stream 默认方法获取流;
  • Stream 接口的静态方法 of 可以获取数组对应的流。

根据Collection获取流

java.util.Collection 接口中加入了default方法 stream() 用来获取流,所以其所有实现类均可获取流。

源码

public interface Collection {
    default Stream<E> stream()
}

示例

List<String> list = new ArrayList<>();
Stream<String> stream1 = list.stream();

Set<String> set = new HashSet<>();
Stream<String> stream2 = set.stream();

Vector<String> vector = new Vector<>();
Stream<String> stream3 = vector.stream();

Stream中的静态方法of获取流

源码

public static<T> Stream<T> of(T... values) {
    return Arrays.stream(values);
}

of 方法的参数其实是一个可变参数,所以支持数组

示例

Stream<String> stream6 = Stream.of("aa", "bb", "cc");
String[] arr = {"aa", "bb", "cc"};
Stream<String> stream7 = Stream.of(arr);

Stream常用方法

方法名方法作用返回值类型方法种类
count统计个数long终结
forEach逐一处理void终结
filter过滤Stream函数拼接
limit取用前几个Stream函数拼接
skip跳过前几个Stream函数拼接
map映射Stream函数拼接
concat组合Stream函数拼接
sorted排序Stream函数排序
distinct去重Stream函数去重
allMatch是否全部匹配指定的条件boolean终结
anyMatch是否至少有一个匹配指定的条件boolean终结
noneMatch是否全部不匹配指定的条件boolean终结
findFirst找出第一个元素Optional<T>终结
findAny找出任意一个元素Optional<T>终结
max最大Optional<T>终结
min最小Optional<T>终结
reduce将所有数据归纳得到一个数据T终结
map类型转换Stream函数转换
mapToInt要将Stream中的Integer类型数据转成int类型IntStream函数转换
collect流中的结果到集合中Collector终结
toArray流中的结果到数组中Object[]终结

终结方法:返回值类型不再是 Stream 类型的方法,不再支持链式调用。

非终结方法:返回值类型仍然是 Stream 类型的方法,支持链式调用。(除了终结方法外,其余方法均为非终结
方法。)

  1. Stream只能操作 一次
  2. Stream方法返回的是 新的流
  3. Stream 不调用 终结方法,中间的操作 不会执行

Stream流的两种方式

串行流和并行流

以上使用的Stream流都是串行的,就是在一个线程上执行。

获取并行Stream流的两种方式

  1. 直接获取并行的流。(通过 .parallelStream()
  2. 将串行流转成并行流。(通过 .parallel()
ArrayList<Integer> list = new ArrayList<>();
// 直接获取并行的流
Stream<Integer> stream1 = list.parallelStream();
// 将串行流转成并行流
Stream<Integer> stream2 = list.stream().parallel();

parallelStream其实就是一个并行执行的流。它通过默认的ForkJoinPool,可能提高多线程任务的速度。

parallelStream存在线程安全问题

解决方法:

  1. 加锁
  2. 使用线程安全的集合
  3. 调用Stream的 toArray() / collect() 操作

Fork/Join框架介绍

parallelStream使用的是Fork/Join框架。Fork/Join框架自JDK 7引入。Fork/Join框架可以将一个大任务拆分为很多小
任务来异步执行。 Fork/Join框架主要包含三个模块:

  1. 线程池:ForkJoinPool
  2. 任务对象:ForkJoinTask
  3. 执行任务的线程:ForkJoinWorkerThread

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法,
ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成
两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处
理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停
止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在
于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。

Fork/Join原理-工作窃取算法

Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的
cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念
Fork/Join工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

0%