Java Stream 优雅编程

转载请注明出处❤️

作者:测试蔡坨坨

原文链接:caituotuo.top/d6148ffa.html


你好,我是测试蔡坨坨。

Stream 流式编程的出现,显著推进了Java对函数式编程的支持,它允许开发者可以用声明式的方式处理数据集合(比如列表、数组等),还能有效利用多核处理器进行并行操作,提升应用程序的性能,同时保持代码简洁易读。

本篇,我们将深入探讨Stream API。

Stream流初体验

为了体验这玩意到底有多爽,我们先来举个简单的栗子。

需求:

  1. 创建一个集合,存储多个字符串元素
  2. 把所有以“范”开头的元素存储到新的集合中
  3. 把所有以“范”开头,长度为3的元素存储到新的集合中
  4. 遍历打印最终结果

传统实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/9 14:48
* function: 不使用Stream流
*/
public class StreamDemo1 {
// 创建一个集合,存储多个字符串
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
list.add("范闲");
list.add("范建");
list.add("范思哲");
list.add("范若若");
list.add("林婉儿");

// 把所有以“范”开头的元素存储到新的集合中
ArrayList<String> newList = new ArrayList<>();

for (String name : list) {
if (name.startsWith("范")) {
newList.add(name);
}
}

System.out.println(newList);

// 把所有以“范”开头,长度为3的元素存储到新的集合中
ArrayList<String> newList2 = new ArrayList<>();

for (String name : newList) {
if (name.startsWith("范") && name.length() == 3) {
newList2.add(name);
}
}

System.out.println(newList2);

// 遍历打印最终结果
for (String name : newList2) {
System.out.println(name);
}
}
}

使用Stream流的方式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/9 14:48
* function: Stream流初体验
*/
public class StreamDemo2 {
// 创建一个集合,存储多个字符串
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
list.add("范闲");
list.add("范建");
list.add("范思哲");
list.add("范若若");
list.add("林婉儿");

// 使用Stream流的方式实现(只需要一句代码)
list.stream().filter(name -> name.startsWith("范")).filter(name -> name.length() == 3).forEach(System.out::println);
}
}

Stream流的思想

经过以上简单的栗子,我们不难发现,使用 Stream流 可以让我们的代码变得非常简洁。

(:人生苦短,就不要浪费时间在重复造轮子这件事情上了。

下面,我们就开始正式地学习它。

首先,我们先来解释一下什么是

这个我们可以将它理解为工厂的流水线,假设有一条制作饮料的流水线,它将会是这样的:

检查瓶子 --> 消毒 --> 灌装 --> 密封 --> 包装

Java中的Stream流跟它类似,我们会把要操作的数据都放到流水线上,进行多轮过滤,最终输出剩余的数据:

创建流并将元素放到流中 --> 过滤操作(留下以范开头的) --> 过滤操作(留下长度为3的) --> 输出操作

Stream流的使用步骤

  1. 先得到一条Stream流(流水线),并把数据放上去
  2. 使用中间方法(intermediate operation)对流水线上的数据进行操作(比如:过滤、转换等,方法调用完毕之后,还可以调用其他的方法)
  3. 使用终端方法/终结方法(terminal operation)对流水线上的数据进行操作(比如:统计、打印等,终端方法是Stream流的最后一步,调用完毕之后,不能再调用其他方法)

同时,Stream流在使用的时候会结合Lambda表达式,简化集合、数组的操作。

获取Stream流

如何获取一条流水线,并把数据放上去?

流可以通过多种方式生产,每种方法适用于不同的场景。

从固定元素或数据结构中创建
获取方法 方法名 说明
单列集合(ArrayList、LinkedList、HashSet、TreeSet等) default Stream stream()
Collection中的默认方法,对于任何实现了Collection接口的集合都可以使用stream方法创建流
双列集合 无法直接使用Stream流,需要通过keySet()或者entrySet()先转成单列集合,再获取Stream流
数组 public static Stream stream(T[] array)
Arrays工具类中的静态方法Arrays.stream()
一堆零散的数据(没有放到集合或数组中的数据) public static Stream of(T… values)
Stream接口中的静态方法Stream.of(),但是这些数据需要是同种数据类型的
合并两个流 public static Stream concat(Stream<? extends T> a, Stream<? extends T> b)
合并两个流
单列集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 16:58
* function: 单列集合获取Stream流
*/
public class createStreamByCollection {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 获取到一条流水线,并把集合中的数据放到流水线上
// 由于ArrayList是Collection接口的实现类,所以可以直接使用Collection接口的stream()方法获取Stream流
list.stream().forEach(s -> System.out.println(s));
System.out.println("--------------------");
list.stream().forEach(System.out::println);
}
}
双列集合

keySet():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 16:58
* function: 双列集合获取Stream流
*/
public class createStreamByMap {
public static void main(String[] args) {
// 创建双列集合
HashMap<String, Integer> hm = new HashMap<>();

// 添加元素
hm.put("范闲", 24);
hm.put("范建", 40);
hm.put("范思哲", 20);
hm.put("范若若", 23);
hm.put("林婉儿", 23);

// 获取Stream流
hm.keySet().stream().forEach(System.out::println);
hm.keySet().stream().forEach(s -> System.out.println(hm.get(s)));
}
}

entrySet():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 16:58
* function: 双列集合获取Stream流
*/
public class createStreamByMap {
public static void main(String[] args) {
// 创建双列集合
HashMap<String, Integer> hm = new HashMap<>();

// 添加元素
hm.put("范闲", 24);
hm.put("范建", 40);
hm.put("范思哲", 20);
hm.put("范若若", 23);
hm.put("林婉儿", 23);

// entrySet 把所有的键值对对象放到Stream流中
hm.entrySet().stream().forEach(System.out::print); // 范建=40林婉儿=23范闲=24范思哲=20范若若=23 无序 HashMap的底层是哈希表 不能保证存取的顺序
}
}
数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.Arrays;
import java.util.HashMap;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 16:58
* function: 数组获取Stream流
*/
public class createStreamByArrays {
public static void main(String[] args) {
// 创建数组
// 基本数据类型 "范闲-男-24", "范建-男-40", "范思哲-男-20", "范若若-女-23", "林婉儿-女-23"
int[] array1 = {24, 40, 20, 23, 23};
// 引用数据类型
String[] array2 = {"范闲", "范建", "范思哲", "范若若", "林婉儿"};

// 获取Stream流
Arrays.stream(array1).forEach(System.out::println);
Arrays.stream(array2).forEach(System.out::println);
}
}

一堆零散的数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 10:27
* function: 一堆零散的数据(没有放到集合或数组中),使用of()方法创建Stream流
*/
public class CreateStreamByOf {
public static void main(String[] args) {
// 创建一个整数流
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).forEach(System.out::println);

// 创建一个字符串流
Stream.of("范闲", "范建", "范思哲", "范若若", "林婉儿").forEach(System.out::println);
}
}

注意点:

Stream.of方法的形参是一个可变参数,可变参数的底层是数组,所以可以传递一堆零散的数据,也可以传递数组:

1
2
String[] arr = {"范闲", "范建", "范思哲", "范若若", "林婉儿"};
Stream.of(arr).forEach(System.out::println);

但是,数组必须是引用数据类型,如果传递的是基本数据类型,是会把整个数组当做一个元素,放到Stream当中:

1
2
int[] arr = {24, 40, 20, 23, 23};
Stream.of(arr).forEach(System.out::println); // [I@15aeb7ab 打印出来是地址
合并两个流

两个流的数据类型尽量保持一致,如果不一致,合并后流的数据类型就是它两共同的父类,相当于对数据类型进行了提升,就会导致无法使用子类里面的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.ArrayList;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: concat合并两个流
*/
public class ConcatStream {
public static void main(String[] args) {
Stream<String> stream1 = Stream.of("a", "b", "c");
Stream<String> stream2 = Stream.of("d", "e", "f");
// 合并两个流
Stream.concat(stream1, stream2).forEach(System.out::println);
}
}
动态构建

以上栗子都是从固定元素或数据结构中创建的,元素数量和内容在创建时就已经确定,如果我们想动态构建一个流,比如根据特定条件动态的决定是否将元素加入流中,我们可以使用StreamBuilder流构建器来添加元素和构建流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 20:09
* function: StreamBuilder动态构建流
*/
public class StreamBuilderDemo {
public static void main(String[] args) {
// 创建一个StreamBuilder对象
Stream.Builder<String> streamBuilder = Stream.builder();

// 通过add方法添加元素到流中
streamBuilder.add("测试");
streamBuilder.add("蔡坨坨");

// 根据条件动态添加,表示这里有50%的概率
if (Math.random() > 0.5) {
streamBuilder.add("小趴蔡");
}

// 通过builder方法来生成Stream对象
// 一旦调用了build方法,StreamBuilder对象就不能再添加元素
Stream<String> stream = streamBuilder.build();
// streamBuilder.add("caituotuo.top"); // IllegalStateException
stream.forEach(System.out::println);
}
}
从文件创建流

从文件创建流也是一个非常实用的功能,特别适合于文本分析,日志文件处理等场景。

我们可以通过Java的Files类的lines方法来实现:

caituotuo.txt

1
2
3
4
5
6
范闲
范思哲
范建
陈萍萍
林婉儿
叶轻眉
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 20:18
* function: 通过文件创建流
*/
public class CreateStreamByFile {
public static void main(String[] args) {
// 获取文件路径
Path path = Paths.get("src/main/java/top/caituotuo/intermediate/streamDemo/createStreamDemo/caituotuo.txt");

// 通过Files.lines打开指定路径的文件
// 它会逐行读取文件内容,每行文本都会被仿作一行字符串处理,然后将其作为元素放入流中
try (Stream<String> lines = Files.lines(path)) {
lines.forEach(System.out::println);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

运行结果:

基本类型创建流

此外,对于基本类型的处理,Stream API 专门提供了IntStream、LongStream、DoubleStream来分别处理int、long和double类型,通过使用range和rangeClosed等方法,可以方便地创建这些基本数据类型的流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 20:35
* function: 创建基本数据类型的流
*/
public class CreateStreamByPrimitiveData {
public static void main(String[] args) {
IntStream intStream = IntStream.of(1, 2, 3, 4, 5);
intStream.forEach(System.out::println);

// 使用range方法创建一个指定范围的int流
IntStream intStream2 = IntStream.range(1, 6); // 包含1到5 不包含6
intStream2.forEach(System.out::println);

// 使用rangeClosed方法创建一个包含两个指定值的int流
IntStream intStream3 = IntStream.rangeClosed(1, 6); // 包含1到6
intStream3.forEach(System.out::println);

// 使用Random类生成生成一个包含随机整数的流
new Random().ints(5).forEach(System.out::println);

// 把基本类型的流转化成对象流
// 通过在基本类型上调用boxed
IntStream intStream4 = IntStream.of(1, 2, 3, 4, 5);
Stream<Integer> stream = intStream4.boxed();
stream.forEach(System.out::println);
}
}
无限流

无限流,顾名思义就是没有固定大小的流,它可以无限的生成元素,我们可以通过generate和iterate这两个方法来创建。处理无限流时需要谨慎,防止无限循环的发生。因此,通常会结合limit等操作来限制流的元素数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package top.caituotuo.intermediate.streamDemo.createStreamDemo;

import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 21:07
* function: 创建无限流
*/
public class CreateStreamByGenerateAndIterate {
public static void main(String[] args) {
// 使用generate方法创建一个无限流
// 它主要用于生成重复的值或者生成随机数据
// 生成重复的值
Stream.generate(() -> "caituotuo")
.limit(5) // 限制流的大小为5
.forEach(System.out::println); // 打印流中的元素
// 生成随机数据
Stream.generate(Math::random)
.limit(5) // 限制流的大小为5
.forEach(System.out::println); // 打印流中的元素

// 使用iterate方法创建一个无限流
// 它主要用于生成数学的序列或实现迭代算法
// 创建一个无限流,从0开始,每次增加1(等差数列)
Stream.iterate(0, n -> n + 1)
.limit(10) // 限制流的大小为10
.forEach(System.out::println); // 打印流中的元素
}
}

中间方法intermediate operations

中间操作用于对流中的元素进行处理,比如筛选、映射、排序等。

方法名 说明
Stream filter(Predicate<? super T> predicate)
过滤
Stream limit(long maxSize)
获取前几个元素
Stream skip(long n)
跳过前几个元素
Stream distinct()
元素去重,依赖hashCode和equals方法(如果流的元素是自定义对象时,需要重写hashCode和equals方法)
Stream map(Function<? super T, ? extends R> mapper)
转换流中的数据类型
排序

注意:

  1. 中间方法,返回新的Stream流,原来的Stream流只能使用一次,建议使用链式编程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ArrayList<String> list = new ArrayList<>();
    Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

    // 假设不使用链式编程
    Stream<String> stream1 = list.stream().filter(s -> s.startsWith("范"));
    Stream<String> stream2 = stream1.filter(s -> s.length() == 3);
    stream2.forEach(System.out::println);

    // 再次对stream1进行操作,会报错 stream has already been operated upon or closed
    // Stream<String> stream3 = stream1.filter(s -> s.length() == 3);

  2. 修改Stream流中的数据,不会影响原来集合或数组中的数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    ArrayList<String> list = new ArrayList<>();
    Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

    // 对流中的数据进行两次过滤
    Stream<String> stream1 = list.stream().filter(s -> s.startsWith("范"));
    Stream<String> stream2 = stream1.filter(s -> s.length() == 3);
    stream2.forEach(System.out::println);
    // 在打印原集合,原集合还是原来的数据
    System.out.println("原集合:" + list); // 原集合:[范闲, 范建, 范思哲, 范若若, 林婉儿]

根据操作性质,中间操作可以分为以下几个功能类别:

  • 筛选和切片(Filtering and Slicing):用于过滤或缩减流中的元素数量,包括filter、distinct、limit、skip
  • 映射(Mapping):用于转换流中的元素或提取元素的特定属性,包括map、flatMap、mapToInt等
  • 排序(Sorting):用于对流中的元素进行排序,包括sorted
筛选和切片
filter()

filter()方法的形参Predicate是一个FunctionalInterface函数式接口,所以可以使用匿名内部类或Lambda表达式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Predicate;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 21:42
* function: filter过滤
*/
public class intermediateDemo1 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 过滤 把“范”开头的留下,其余的不要
// 匿名内部类实现
list.stream().filter(new Predicate<String>() {
@Override
public boolean test(String s) {
// 如果返回值为true,表示当前数据留下
// 如果返回值为false,表示当前数据不要
return s.startsWith("范");
}
}).forEach(s -> System.out.println(s));

// 改成Lambda表达式
// () -> {}
// 只有一个参数,参数类型可以省略 s
// 方法体为 s.startsWith("范")
// s -> s.startsWith("范")
list.stream().filter(s -> s.startsWith("范")).forEach(System.out::println);
}
}
limit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 21:42
* function: limit(N)获取前N个元素
*/
public class intermediateDemo2 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

list.stream().limit(3).forEach(System.out::println);
/*
运行结果:打印前3个
范闲
范建
范思哲
*/
}
}
skip()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 21:42
* function: skip(N)跳过前N个元素
*/
public class intermediateDemo2 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

list.stream().skip(3).forEach(System.out::println);
/*
运行结果:
范若若
林婉儿
*/
}
}
distinct()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 21:42
* function: distinct去重
*/
public class intermediateDemo2 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿", "范闲");

// 去除重复的元素
list.stream().distinct().forEach(System.out::println);
}
}

distinct的源码非常的复杂,可以简单的看一下重点部分,先是调用了makeRef方法,makeRef方法里面可以看到调用了HashSet,所以distinct的底层其实是调用了HashSet进行去重的,而HashSet在存储自定义元素的时候会重写hashCode和equals方法:

映射
map()

映射本质上是一个数据转换的过程,map方法可以通过提供的函数,将流中的每个元素转换成新的元素,最后生成一个新元素构成的流。

map的形参是Function,也是一个函数式接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Function;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 21:42
* function: map类型转换
*/
public class intermediateDemo3 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲-24", "范建-40", "范思哲-20", "范若若-23", "林婉儿-23");

// 只获取集合里面的年龄 String --> int
// Function第一个泛型表示原本的数据类型,第二个泛型表示转换后的数据类型
// apply方法的形参s:依次表示流里面的每个数据
// 返回值:表示转换后的数据
list.stream()
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.parseInt(s.split("-")[1]);
}
})
.forEach(s -> System.out.println(s));

System.out.println("--------------------------------");

list.stream()
.map(s -> Integer.parseInt(s.split("-")[1]))
.forEach(System.out::println);
}
}
flatMap()

map方法适用于单层结构的流,进行元素一对一的转换,例如更改数据类型或提取信息,但对于嵌套的数组或其他多层结构的数据处理不够灵活,在这些情况下,flatMap成为更合适的选择,它不仅能够执行map的转换功能,还能够扁平化多层数据结构,将它们转换合并成一个单层流。

处理嵌套列表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 22:59
* function: 处理嵌套列表
*/
public class FlatMapExample {
public static void main(String[] args) {
List<List<String>> nestedList = Arrays.asList(
Arrays.asList("a", "b", "c"),
Arrays.asList("d", "e"),
Arrays.asList("f", "g", "h")
);

List<String> flatList = nestedList.stream()
.flatMap(List::stream)
.collect(Collectors.toList());

System.out.println(flatList); // 输出: [a, b, c, d, e, f, g, h]
}
}

处理嵌套数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.Arrays;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 23:04
* function: 处理嵌套数组
*/
public class FlatMapArrayExample {
public static void main(String[] args) {
String[][] nestedArray = {
{"a", "b", "c"},
{"d", "e"},
{"f", "g", "h"}
};

String[] flatArray = Arrays.stream(nestedArray)
.flatMap(Arrays::stream)
.toArray(String[]::new);

System.out.println(Arrays.toString(flatArray)); // 输出: [a, b, c, d, e, f, g, h]
}
}

处理对象流:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class Person {
private String name;
private List<String> hobbies;

public Person(String name, List<String> hobbies) {
this.name = name;
this.hobbies = hobbies;
}

public List<String> getHobbies() {
return hobbies;
}
}

public class FlatMapObjectExample {
public static void main(String[] args) {
List<Person> people = Arrays.asList(
new Person("Joker", Arrays.asList("Reading", "Hiking")),
new Person("Jane", Arrays.asList("Cooking", "Running")),
new Person("Jack", Arrays.asList("Swimming", "Cycling"))
);

List<String> hobbies = people.stream()
.flatMap(person -> person.getHobbies().stream())
.collect(Collectors.toList());

System.out.println(hobbies); // 输出: [Reading, Hiking, Cooking, Running, Swimming, Cycling]
}
}

简单来说,map方法用于元素一对一的转换,而flatMap不仅可以用于转换,还用于将多层的流合并为一个单层流,另外我们也可以通过mapToInt、mapToLong、mapToDouble等方法将流转换位对应的数值流。

排序

排序分为自然排序和自定义排序,当流中的元素类型实现了Comparable接口时,比如字符串或包装类型的数字,如Integer、Long等,可以直接调用无参数的sorted方法,按照自然顺序进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package top.caituotuo.intermediate.streamDemo.intermediate;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 21:42
* function:
*/
public class intermediateDemo4 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "cherry", "apple", "blueberry", "pear");

// 排序
list.stream().sorted().forEach(System.out::println);

System.out.println("--------------------");
// 自定义排序规则 长度从短到长排序
list.stream().sorted((s1, s2) -> s1.length() - s2.length()).forEach(System.out::println);

System.out.println("--------------------");

// Comparator接口的实现类 长度从短到长排序
list.stream()
.sorted(Comparator.comparingInt(String::length))
.forEach(System.out::println);

System.out.println("--------------------");

// Comparator接口的实现类 长度从长到短排序
list.stream()
.sorted(Comparator.comparingInt(String::length).reversed())
.forEach(System.out::println);
}
}

终端方法terminal operation

常见的终端方法有以下四个:

名称 说明
void forEach(Consumer<? super T> action); 遍历(返回值是void,所以forEach结束后不能再调用流里面的其他方法)
long count(); 统计(返回值是long类型的整数,所以也不能在调用流里面的其他方法)
Object[] toArray(); 收集流中的数据,放到数组中
<R, A> R collect(Collector<? super T, A, R> collector); 收集流中的数据,放到集合中

根据操作性质,终端操作可以分为以下几个功能类别:

  • 查找与匹配(Search and Match):这类操作也属于短路操作,当这些操作在找到符合条件的元素后,会立即结束处理返回结果,而不需要处理整个流,有效短路了流的遍历,提高了处理效率,特别适用在快速筛选或验证数据的场景中,包括anyMatch、noneMatch、allMatch、findFirst、findAny
  • 聚合操作(Aggregation):count、max、min、average、sum
  • 规约操作(Reduction):reduce
  • 收集操作(Collection):collect
  • 迭代操作(Iteration):forEach
forEach()

可以看到forEach的形参是一个Consumer<? super T> action

Consumer是一个函数式接口,所以可以使用匿名内部类或Lambda表达式实现:

使用匿名内部类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Consumer;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法forEach 匿名内部类
*/
public class StreamDemo9 {
public static void main(String[] args) {
// 创建一个集合,存储多个字符串
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 遍历获取流里面的每一个元素并打印
// 匿名内部类实现
// Consumer的泛型:表示流中数据的类型,这里是String类型
// accept方法的形参:表示对流中的每一个元素,跟泛型里面的保持一致,也是String类型
// accept方法体:对每一个数据的处理操作
list.stream().forEach(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
}
}

运行结果:

使用Lambda表达式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.function.Consumer;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法forEach Lambda表达式
*/
public class StreamDemo10 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 遍历获取流里面的每一个元素并打印
list.stream().forEach(name -> System.out.println(name));
}
}

运行结果:

count()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collections;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法 count
*/
public class StreamDemo11 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 当然这里可以直接用list.size()来获取集合的长度,使用count就显得有点蠢了
long count = list.stream().count();
System.out.println(count);

// count主要用于统计筛选后的结果有多少个
long count2 = list.stream().filter(name -> name.startsWith("范")).count();
System.out.println(count2);
}
}

运行结果:

toArray()

toArray有两个方法,一个是返回Object对象,另一个是返回具体的类型:

空参,返回Object对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.IntFunction;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法 toArray 空参 返回Object数组
*/
public class StreamDemo12 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 空参 返回Object数组
Object[] array = list.stream().toArray();
System.out.println(Arrays.toString(array)); // [范闲, 范建, 范思哲, 范若若, 林婉儿]
}
}

传参,指定返回的数组类型:形参是IntFunction

IntFunction也是一个函数式接口

匿名内部类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.IntFunction;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法 toArray 指定数组类型 匿名内部类实现
*/
public class StreamDemo12 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 非空参 返回指定类型数组
// 匿名内部类实现
// IntFunction的泛型:具体类型的数组,这里就是String类型
// apply的形参:表示流中数据的个数,要跟数组的长度保持一致
// apply的返回值:表示数组的类型,要跟泛型一致
// apply的方法体:创建数组

// toArray方法的参数的作用:创建一个指定类型的数组
// toArray方法的底层:会依次得到流里面的每一个数据,并把数据放到数组中
// toArray方法的返回值:装着流里面所有数据的数组
String[] array2 = list.stream().toArray(new IntFunction<String[]>() {
@Override
public String[] apply(int value) {
return new String[value];
}
});
System.out.println(Arrays.toString(array2)); // [范闲, 范建, 范思哲, 范若若, 林婉儿]
}
}

Lambda表达式实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.IntFunction;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法 toArray lambda表达式实现
*/
public class StreamDemo13 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲", "范建", "范思哲", "范若若", "林婉儿");

// 非空参 返回指定类型数组
// lambda表达式实现
// lambda表达式格式:() -> {}
// 形参只有一个,数据类型可以省略,再根据指定的个数value创建数组 value -> new String[value]
String[] array = list.stream().toArray(value -> new String[value]);
System.out.println(Arrays.toString(array)); // [范闲, 范建, 范思哲, 范若若, 林婉儿]);
}
}
collect()

收集流中的数据,放到集合中(List、Set、Map)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package top.caituotuo.intermediate.streamDemo;

import java.util.*;
import java.util.stream.Collectors;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法 collect (toList、toSet)
*/
public class StreamDemo14 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲-男-24", "范建-男-40", "范思哲-男-20", "范若若-女-23", "林婉儿-女-23", "范闲-男-24");

// 收集到List集合中 ArrayList
// 收集所有的男性
List<String> newList = list.stream()
.filter(s -> "男".equals(s.split("-")[1]))
.collect(Collectors.toList());
System.out.println(newList); // [范闲-男-24, 范建-男-40, 范思哲-男-20, 范闲-男-24]

// 收集到Set集合中 HashSet
Set<String> newSet = list.stream()
.filter(s -> "男".equals(s.split("-")[1]))
.collect(Collectors.toSet());
System.out.println(newSet); // [范建-男-40, 范闲-男-24, 范思哲-男-20]
}
}

收集到LIst和收集到Set的区别:

  • ArrayList是有序的,且不会去重
  • HashSet是无序的,且会进行去重
  • Set的查询效率是O(1),LIst的查询效率是O(N)

匿名内部类实现收集到Map集合中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package top.caituotuo.intermediate.streamDemo;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* author: 测试蔡坨坨
* datetime: 2024/6/9 14:48
* function: 终端方法 collect Map
*/
public class StreamDemo15 {
public static void main(String[] args) {
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "范闲-男-24", "范建-男-40", "范思哲-男-20", "范若若-女-23", "林婉儿-女-23");

// 收集到Map集合中
// 键值对
// 查找所有男性 -> 收集到Map集合中 键:姓名 值:年龄
/*
toMap:
第一个参数:键的生成规则
第二个参数:值的生成规则
参数1:
Function泛型1:表示流中每个数据的类型
泛型2:表示Map集合中键的数据类型
apply方法的形参:依次表示流里面的每一个数据
方法体:生成键的逻辑
返回值:已经生成的键

参数2:
Function泛型1:表示流中每个数据的类型
泛型2:表示Map集合中值的数据类型
apply方法的形参:依次表示流里面的每一个数据
方法体:生成值的逻辑
返回值:已经生成的值

注意:收集到Map集合中,键不能重复,否则会报错
*/
Map<String, Integer> newMap = list.stream()
.filter(s -> "男".equals(s.split("-")[1]))
.collect(Collectors.toMap(
new Function<String, String>() {
@Override
public String apply(String s) {
return s.split("-")[0];
}
},
new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return Integer.parseInt(s.split("-")[2]);
}
}));

System.out.println(newMap);
}
}
reduce()

对流中的元素进行聚合操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

public class ReduceExample1 {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

Optional<Integer> sum = numbers.stream()
.reduce((a, b) -> a + b);

sum.ifPresent(System.out::println); // 输出: 15
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
import java.util.Arrays;
import java.util.List;

public class ReduceExample2 {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);

int sum = numbers.stream()
.reduce(0, (a, b) -> a + b);

System.out.println(sum); // 输出: 15
}
}

并行流Parallel Streams

核心原理

到这里,大家应该已经对流的具体使用方法有了一定的了解,另外,目前展示的所有示例都是基于对顺序流的操作,它是单线程顺序执行的,Stream API 还提供了一种更高效的解决方案,那就是并行流。它能够借助多核处理器的并行计算能力,加速数据处理,特别适合大型数据集,或计算密集型任务。

其核心工作原理:

并行流在开始时,分割迭代器Spliterator会将数据分割成多个片段,分割过程通常采用递归的方式动态进行,以平衡子任务的工作负载,提高资源利用率;然后Fork/Join框架会将这些数据片段分配到多个线程和处理器核心上进行并行处理;处理完成后,结果将会被汇总合并,其核心是任务的分解Fork结果的合并Join

在操作上,无论是并行流还是顺序流,两者都提供了相同的中间操作和终端操作。这意味着你可以用几乎相同的方式进行数据处理和结果收集。

forEachOrdered

forEachOrdered()之所以能够保持这个出现顺序,这归功于Spliterator和Fork/Join框架的协作,其工作原理是这样的:

在处理并行流时,对于有序数据源,比如List,Spliterator会对数据源进行递归分割,分隔通常是基于逻辑上的,而非物理上的复制数据,通过划分数据源的索引范围来实现,每次分割都会产生一个新的Spliterator实例,该实例内部维护了指向原数据的索引范围,这种分割机制可以让数据的出现顺序得以保持;然后Fork/Join框架接手,将分割后的数据块分配给不同的子任务执行,对于forEachOrdered的操作,Fork/Join框架会依据Spliterator维护的顺序信息来调度方法的执行顺序,这意味着即使某个子任务较早的完成,如果其关联的方法执行顺序还未到来,那么系统将会缓存顺序并暂停执行该方法,直到所有前序任务都已完成,并执行了各自的相关方法,这种机制确保了即使在并行处理的情况下,每个数据也会按照原始数据的出现顺序执行,当然,这么做也可能会牺牲一些并行执行的效率。

另外,值得注意的是forEachOrdered只专注于按出现顺序执行指定操作,不需要对数据进行合并和收集。

而对于forEach,尽管Spliterator分割策略相同,依旧保持着顺序信息,但Fork/Join框架执行时会忽略这些顺序信息,因此执行不保证循序原始顺序,但能够提供更高的执行效率。

forEach操作会在不同的线程上独立执行,这意味着如果操作的是共享资源,那么必须确保这些操作是线程安全的。如果没有适当的同步措施,可能会引发线程安全问题,因此在并行流中,forEach更适合执行无状态操作或处理资源独立的场景。

1
2
3
4
5
list.parallelStream()
.map(String::toLowerCase)
.forEach(item -> {
System.out.println("Item: " + item + "--> Thread: " + Thread.currentThread().getName());
});

运行结果:可以看到在不同的线程上执行

收集数据的顺序问题

接下来,我们来看一下并行流在收集数据时的顺序问题。

比如我们将数据收集到列表List中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 13:15
* function:
*/
public class ParallelStreamsDemo2 {
public static void main(String[] args) {
// 创建由6个大写字母组成的list
ArrayList<String> list = new ArrayList<>();
Collections.addAll(list, "C", "S", "C", "T", "T", "A");

List<String> list1 = list.parallelStream()
.map(String::toLowerCase)
.collect(Collectors.toList());
System.out.println(list1); // [c, s, c, t, t, a]
}
}

可以看到收集到List中的顺序和原始数据的顺序是相同的。

其实,收集到List和forEachOrdered操作很相似,尤其是在数据的分割和并行处理阶段,主要的区别在于collect操作需要在最终合并收集数据,而forEachOrdered则不需要:

每个并行执行的任务在完成处理后,会将其结果存储到一个临时数据结构中,Fork/Join框架会利用Spliterator提供的区段顺序信息,引导这些临时结果按顺序的合并。同样,即使某个逻辑上靠后的数据段先处理完成,合并时也不会让这个结果前置,整个合并过程递归进行,直至所有的结果合并完毕。

我们可以自定义一个收集器,来实现toList方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Collectors;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 13:15
* function:
*/
public class ParallelStreamsDemo2 {
public static void main(String[] args) {
ArrayList<String> arrayList = new ArrayList<>();
Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");

List<String> collect = arrayList.parallelStream()
.map(String::toLowerCase)
.collect(Collector.of(
// 定义一个spliterator分割迭代器,创建一个新的List实例,作为数据容器
() -> {
// 监控创建过程
System.out.println("Supplier(new ArrayList): " + " Thead: " + Thread.currentThread().getName());
return new ArrayList<>();
},
// Accumulator累加器
(ArrayList<String> list, String item) -> {
// 监控累加过程
System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName());
// 累加操作,将元素添加到list中
list.add(item);
},
// Combiner组合器 顺序流和并行流的区别之一是并行流需要合并步骤
(ArrayList<String> left, ArrayList<String> right) -> {
// 监控合并过程
System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName());
// 合并操作,将list2中的元素添加到list1中
left.addAll(right);
return left;
},
// Characteristics characteristics 定义流操作的特性
// IDENTITY_FINISH 表示流操作的最终结果与输入参数相同,即最终结果是ArrayList<String>
Collector.Characteristics.IDENTITY_FINISH
));
System.out.println(collect); // [c, s, c, t, t, a]
}
}

当我们将toList操作改成toSet操作,结果就会呈现无序,并且还有做去重操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collector;
import java.util.stream.Collectors;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 13:15
* function:
*/
public class ParallelStreamsDemo3 {
public static void main(String[] args) {
ArrayList<String> arrayList = new ArrayList<>();
Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");

Set<String> collect = arrayList.parallelStream()
.map(String::toLowerCase)
.collect(Collectors.toSet());
System.out.println(collect); // [a, c, s, t]
}
}

这是因为Set数据结构的性质导致的,而不是并行处理本身导致的。

需要强调的是,无论采用toList还是toSet收集数据,整个处理过程,包括Spliterator的分割策略,Fork/Join框架的执行和合并策略都没变。因为这些策略是由数据源的性质决定的,而数据源是同一个List,唯一不同的是合并前存储局部结果的临时数据结构,一个是List,一个是Set。

同样我们也来自定义toSet的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package top.caituotuo.intermediate.streamDemo;

import java.util.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 13:15
* function:
*/
public class ParallelStreamsDemo3 {
public static void main(String[] args) {
ArrayList<String> arrayList = new ArrayList<>();
Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");

Set<String> collect = arrayList.parallelStream()
.map(String::toLowerCase)
.collect(Collector.of(
// 定义一个spliterator分割迭代器,创建一个新的List实例,作为数据容器
() -> {
// 监控创建过程
System.out.println("Supplier(new HashSet): " + " Thead: " + Thread.currentThread().getName());
return new HashSet<>();
},
// Accumulator累加器
(HashSet<String> list, String item) -> {
// 监控累加过程
System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName());
// 累加操作,将元素添加到list中
list.add(item);
},
// Combiner组合器 顺序流和并行流的区别之一是并行流需要合并步骤
(HashSet<String> left, HashSet<String> right) -> {
// 监控合并过程
System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName());
// 合并操作,将list2中的元素添加到list1中
left.addAll(right);
return left;
},
// Characteristics characteristics 定义流操作的特性
// IDENTITY_FINISH 表示流操作的最终结果与输入参数相同,即最终结果是HashSet<String>
Collector.Characteristics.IDENTITY_FINISH
));
System.out.println(collect); // [a, c, s, t]
}
}

Characteristics

UNORDERED

另外,关于Characteristics中的UNORDERED和CONCURRENT两个特性也经常会引起一些误解和混淆。

特别是UNORDERED,许多人可能会误以为一旦设置了UNORDERED,处理结果就会呈现无序状态,然而结果往往还是有序的:

1
Collector.Characteristics.UNORDERED

可以看到,即便加了UNORDERED,结果仍然是顺序的。

这是因为即便Collector被标记为UNORDERED,如果数据源或流操作本身是有序的,系统的执行策略通常仍会保持这些元素的出现顺序,只有在特定场景下,系统才会针对那些被标记为UNORDERED的流进行优化,从而打破约束(比如数据源本身无序,UNORDERED会进一步确认处理过程不需要维护顺序,允许更多的优化,以提高性能)。

CONCURRENT

在标准的并行流处理中,每个线程处理数据的一个子集维护自己的局部结果容器,在所有的结果处理完成后,这些局部结果会用过一个Combiner的函数合并成一个最终结果。

使用CONCURRENT特性后,所有线程将共享一个结果容器,而不是维护独立的局部结果,从而减少了合并的需要,这通常会带来性能上的提升,特别是当结果容器较大,或合并操作较为复杂时。这也就意味着供应函数只会被调用一次,只创建一个结果容器,而且这个容器必须是线程安全的,例如ConcurrentHashMap,此外合并函数将不会再执行。

由于ArrayList不是线程安全的,即使被标记为CONCURRENT,也不会生效,所以我们将数据源换成线程安全的ConcurrentHashMap:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collector;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 13:15
* function:
*/
public class ParallelStreamsDemo4 {
public static void main(String[] args) {
ArrayList<String> arrayList = new ArrayList<>();
Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");

ConcurrentHashMap<String, String> collect = arrayList.parallelStream()
.map(String::toLowerCase)
.collect(Collector.of(
() -> {
System.out.println("Supplier(new ConcurrentHashMap): " + " Thead: " + Thread.currentThread().getName());
return new ConcurrentHashMap<>();
},
(ConcurrentHashMap<String, String> map, String item) -> {
System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName());
map.put(item.toUpperCase(), item.toLowerCase());
},
(ConcurrentHashMap<String, String> left, ConcurrentHashMap<String, String> right) -> {
System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName());
left.putAll(right);
return left;
},
Collector.Characteristics.IDENTITY_FINISH,
Collector.Characteristics.CONCURRENT
));
System.out.println(collect);
}
}

运行结果:

运行后发现Supplier函数仍被多次调用,依旧会按照默认策略在多个线程中创建ConcurrentHashMap实例,这是因为在处理有序流的情况下,如果多个线程并发更新同一个共享的累加器容器,那么元素更新的顺序将变得不确定,为了避免这种情况,框架通常会忽略有序源的CONCURRENT特性,除非同时还指定了UNORDERED特性。

1
2
3
Collector.Characteristics.IDENTITY_FINISH,
Collector.Characteristics.CONCURRENT,
Collector.Characteristics.UNORDERED

运行结果:

我们可以看到Supplier只执行了一次,只new了一个ConcurrentHashMap,Combiner就没有执行,也就是说这个时候CONCURRENT特性生效了。

我们再把UNORDERED删除,把数据源List改成Set:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package top.caituotuo.intermediate.streamDemo;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collector;

/**
* author: 测试蔡坨坨 caituotuo.top
* datetime: 2024/6/10 13:15
* function:
*/
public class ParallelStreamsDemo5 {
public static void main(String[] args) {
HashSet<String> set = new HashSet<>();
Collections.addAll(set, "C", "S", "C", "T", "T", "A");

ConcurrentHashMap<String, String> collect = set.parallelStream()
.map(String::toLowerCase)
.collect(Collector.of(
() -> {
System.out.println("Supplier(new ConcurrentHashMap): " + " Thead: " + Thread.currentThread().getName());
return new ConcurrentHashMap<>();
},
(ConcurrentHashMap<String, String> map, String item) -> {
System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName());
map.put(item.toUpperCase(), item.toLowerCase());
},
(ConcurrentHashMap<String, String> left, ConcurrentHashMap<String, String> right) -> {
System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName());
left.putAll(right);
return left;
},
Collector.Characteristics.IDENTITY_FINISH,
Collector.Characteristics.CONCURRENT
// Collector.Characteristics.UNORDERED
));
System.out.println(collect);
}
}

运行结果:

结果也是只实例化了一个容器,也就是说CONCURRENT生效了,对于无序数据源不需要加UNORDERED。