什么是Stream流
Stream是流式思想,相当于工厂的流水线,对集合中的数据进行加工处理
Stream和IO流(InputStream/OutputStream)没有任何关系!
获取Stream流的两种方式
- 所有的
Collection
集合都可以通过stream
默认方法获取流; Stream
接口的静态方法of
可以获取数组对应的流。
根据Collection获取流
java.util.Collection 接口中加入了default方法 stream()
用来获取流,所以其所有实现类均可获取流。
源码
public interface Collection {
default Stream<E> stream()
}
示例
List<String> list = new ArrayList<>();
Stream<String> stream1 = list.stream();
Set<String> set = new HashSet<>();
Stream<String> stream2 = set.stream();
Vector<String> vector = new Vector<>();
Stream<String> stream3 = vector.stream();
Stream中的静态方法of获取流
源码
public static<T> Stream<T> of(T... values) {
return Arrays.stream(values);
}
of 方法的参数其实是一个可变参数,所以支持数组
示例
Stream<String> stream6 = Stream.of("aa", "bb", "cc");
String[] arr = {"aa", "bb", "cc"};
Stream<String> stream7 = Stream.of(arr);
Stream常用方法
方法名 | 方法作用 | 返回值类型 | 方法种类 |
---|---|---|---|
count | 统计个数 | long | 终结 |
forEach | 逐一处理 | void | 终结 |
filter | 过滤 | Stream | 函数拼接 |
limit | 取用前几个 | Stream | 函数拼接 |
skip | 跳过前几个 | Stream | 函数拼接 |
map | 映射 | Stream | 函数拼接 |
concat | 组合 | Stream | 函数拼接 |
sorted | 排序 | Stream | 函数排序 |
distinct | 去重 | Stream | 函数去重 |
allMatch | 是否全部匹配指定的条件 | boolean | 终结 |
anyMatch | 是否至少有一个匹配指定的条件 | boolean | 终结 |
noneMatch | 是否全部不匹配指定的条件 | boolean | 终结 |
findFirst | 找出第一个元素 | Optional<T> | 终结 |
findAny | 找出任意一个元素 | Optional<T> | 终结 |
max | 最大 | Optional<T> | 终结 |
min | 最小 | Optional<T> | 终结 |
reduce | 将所有数据归纳得到一个数据 | T | 终结 |
map | 类型转换 | Stream | 函数转换 |
mapToInt | 要将Stream中的Integer类型数据转成int类型 | IntStream | 函数转换 |
collect | 流中的结果到集合中 | Collector | 终结 |
toArray | 流中的结果到数组中 | Object[] | 终结 |
终结方法:返回值类型不再是 Stream 类型的方法,不再支持链式调用。
非终结方法:返回值类型仍然是 Stream 类型的方法,支持链式调用。(除了终结方法外,其余方法均为非终结
方法。)
- Stream只能操作 一次
- Stream方法返回的是 新的流
- Stream 不调用 终结方法,中间的操作 不会执行
Stream流的两种方式
串行流和并行流
以上使用的Stream流都是串行的,就是在一个线程上执行。
获取并行Stream流的两种方式
- 直接获取并行的流。(通过
.parallelStream()
) - 将串行流转成并行流。(通过
.parallel()
)
ArrayList<Integer> list = new ArrayList<>();
// 直接获取并行的流
Stream<Integer> stream1 = list.parallelStream();
// 将串行流转成并行流
Stream<Integer> stream2 = list.stream().parallel();
parallelStream其实就是一个并行执行的流。它通过默认的ForkJoinPool,可能提高多线程任务的速度。
parallelStream存在线程安全问题
解决方法:
- 加锁
- 使用线程安全的集合
- 调用Stream的 toArray() / collect() 操作
Fork/Join框架介绍
parallelStream使用的是Fork/Join框架。Fork/Join框架自JDK 7引入。Fork/Join框架可以将一个大任务拆分为很多小
任务来异步执行。 Fork/Join框架主要包含三个模块:
- 线程池:ForkJoinPool
- 任务对象:ForkJoinTask
- 执行任务的线程:ForkJoinWorkerThread
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法,
ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成
两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处
理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停
止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在
于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
Fork/Join原理-工作窃取算法
Fork/Join最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的
cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个Fork/Join框架的核心理念
Fork/Join工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。