什么是 Stream?
Stream 的聚合操作与数据库 SQL 的聚合操作 sorted、filter、map 等类似。我们在应用层就可以高效地实现类似数据库 SQL 的聚合操作了,而在数据操作方面,Stream 不仅可以通过串行的方式实现数据操作,还可以通过并行的方式处理大批量数据,提高数据的处理效率。
一个简单的例子来体验下 Stream 的简洁与强大。
这个 Demo 的需求是过滤分组一所中学里身高在 160cm 以上的男女同学,我们先用传统的迭代方式来实现,代码如下:
Map<String, List<Student>> stuMap = new HashMap<String, List<Student>>();
for (Student stu: studentsList) {
if (stu.getHeight() > 160) {
//如果身高大于160
if (stuMap.get(stu.getSex()) == null) {
//该性别还没分类
List<Student> list = new ArrayList<Student>(); //新建该性别学生的列表
list.add(stu);//将学生放进去列表
stuMap.put(stu.getSex(), list);//将列表放到map中
} else {
//该性别分类已存在
stuMap.get(stu.getSex()).add(stu);//该性别分类已存在,则直接放进去即可
}
}
}
Map<String, List<Student>> stuMap = stuList.stream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));
Stream 如何优化遍历?
官方将 Stream 中的操作分为两大类:中间操作(Intermediate operations)和终结操作(Terminal operations)。中间操作只对操作进行了记录,即只会返回一个流,不会进行计算操作,而终结操作是实现了计算操作。
中间操作又可以分为无状态(Stateless)与有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响,后者是指该操作只有拿到所有元素之后才能继续下去。
终结操作又可以分为短路(Short-circuiting)与非短路(Unshort-circuiting)操作,前者是指遇到某些符合条件的元素就可以得到最终结果,后者是指必须处理完所有元素才能得到最终结果。操作分类详情如下图所示:
Stream 源码实现
- BaseStream 和 Stream 为最顶端的接口类。BaseStream 主要定义了流的基本接口方法,例如,spliterator、isParallel 等;Stream 则定义了一些流的常用操作方法,例如,map、filter 等。
- ReferencePipeline 是一个结构类,他通过定义内部类组装了各种操作流。他定义了 Head、StatelessOp、StatefulOp 三个内部类,实现了 BaseStream 与 Stream 的接口方法。
- Sink 接口是定义每个 Stream 操作之间关系的协议,他包含 begin()、end()、cancellationRequested()、accpt() 四个方法。ReferencePipeline 最终会将整个 Stream 流操作组装成一个调用链,而这条调用链上的各个 Stream 操作的上下关系就是通过 Sink 接口协议来定义实现的。
List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
String maxLenStartWithZ = names.stream()
.filter(name -> name.startsWith("张"))
.mapToInt(String::length)
.max()
.toString();
查找出一个长度最长,并且以张为姓氏的名字
内部实现
在串行处理操作中,Stream 在执行每一步中间操作时,并不会做实际的数据操作处理,而是将这些中间操作串联起来,最终由终结操作触发,生成一个数据处理链表,通过 Java8 中的 Spliterator 迭代器进行数据处理;此时,每执行一次迭代,就对所有的无状态的中间操作进行数据处理,而对有状态的中间操作,就需要迭代处理完所有的数据,再进行处理操作;最后就是进行终结操作的数据处理。
Stream 并行处理
并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片,Splititerator 中的 estimateSize 方法会估算出分片的数据量。
ForkJoin 框架和估算算法,在这里我就不具体讲解了,如果感兴趣,你可以深入源码分析下该算法的实现。
通过预估的数据量获取最小处理单元的阈值,如果当前分片大小大于最小处理单元的阈值,就继续切分集合。每个分片将会生成一个 Sink 链表,当所有的分片操作完成后,ForkJoin 框架将会合并分片任何结果集。
在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以我们在平时处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。
//使用一个容器装载100个数字,通过Stream并行处理的方式将容器中为单数的数字转移到容器parallelList
List<Integer> integerList= new ArrayList<Integer>();
for (int i = 0; i <100; i++) {
integerList.add(i);
}
List<Integer> parallelList = new ArrayList<Integer>() ;
integerList.stream()
.parallel()
.filter(i->i%2==1)
.forEach(i->parallelList.add(i));
// 像这种就需要注意:在并行操作arraylist时,需要考虑线程安全问题,当然如果是并发安全的容器的话,就不用考虑这些了