转载请注明出处❤️
作者:测试蔡坨坨
原文链接:caituotuo.top/7665c844.html
你好,我是测试蔡坨坨。
在上一篇文章(Java Stream 优雅编程)中,我们详细介绍了Java Stream流的工作原理以及实现步骤,相信大家应该已经对流的具体使用方法有了一定的了解。另外,目前为止所有示例都是基于对顺序流
的操作,它是单线程顺序执行的,Stream API 还提供了一种更高效的解决方案,那就是并行流
,它能够借助多核处理器的并行计算能力,加速数据处理,特别适合大型数据集
,或计算密集型任务
。
所以,本篇我们就来学习一下Parallel Streams(并行流)。
Parallel Streams核心原理
并行流的核心工作原理:
并行流在开始时,分割迭代器Spliterator
会将数据分割成多个片段,分割过程通常采用递归
的方式动态进行,以平衡子任务的工作负载,提高资源利用率;然后Fork/Join框架
会将这些数据片段分配到多个线程和处理器核心上进行并行处理;处理完成后,结果将会被汇总合并,其核心是任务的分解Fork
和结果的合并Join
。
在操作上,无论是并行流还是顺序流,两者都提供了相同的中间操作和终端操作。这意味着你可以用几乎相同的方式进行数据处理和结果收集。
forEachOrdered
由于并行流是多线程操作,可能存在处理结果顺序问题,我们可以通过forEachOrdered()这个方法来进行处理。forEachOrdered之所以能够保持结果与输入相同的出现顺序,这归功于Spliterator和Fork/Join框架的协作,其工作原理是这样的:
在处理并行流时,对于有序数据源,比如List,Spliterator会对数据源进行递归分割,分隔通常是基于逻辑上的,而非物理上的复制数据,通过划分数据源的索引范围
来实现,每次分割都会产生一个新的Spliterator实例,该实例内部维护了指向源数据的索引范围,这种分割机制可以让数据的出现顺序得以保持;然后Fork/Join框架会将分割后的数据块分配给不同的子任务执行,对于forEachOrdered的操作,Fork/Join框架会依据Spliterator维护的顺序信息来调度方法的执行顺序,这意味着即使某个子任务较早的完成,如果其关联的方法执行顺序还未到来,那么系统将会缓存顺序并暂停执行该方法,直到所有前序任务都已完成,并执行了各自的相关方法,这种机制确保了即使在并行处理的情况下,每个数据也会按照原始数据的出现顺序执行,当然这么做也可能会牺牲一些并行执行的效率。
另外,值得注意的是forEachOrdered只专注于按出现顺序执行指定操作,不需要对数据进行合并和收集。
而对于forEach,尽管Spliterator分割策略相同,依旧保持着顺序信息,但Fork/Join框架执行时会忽略这些顺序信息
,因此执行不保证遵循原始顺序,但能够提供更高的执行效率。
forEach操作会在不同的线程上独立执行,这意味着如果操作的是共享资源,那么必须确保这些操作是线程安全的。如果没有适当的同步措施,可能会引发线程安全问题,因此,在并行流中,forEach更适合执行无状态操作或处理资源独立的场景。
1 2 3 4 5
| list.parallelStream() .map(String::toLowerCase) .forEach(item -> { System.out.println("Item: " + item + "--> Thread: " + Thread.currentThread().getName()); });
|
运行结果:可以看到在不同的线程上执行
collect的顺序问题
接下来,我们来看一下并行流在收集数据时的顺序问题。
例如我们将数据收集到List中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package top.caituotuo.intermediate.streamDemo;
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.stream.Collectors;
public class ParallelStreamsDemo2 { public static void main(String[] args) { ArrayList<String> list = new ArrayList<>(); Collections.addAll(list, "C", "S", "C", "T", "T", "A");
List<String> list1 = list.parallelStream() .map(String::toLowerCase) .collect(Collectors.toList()); System.out.println(list1); } }
|
从运行结果可以看到收集到List中的顺序和原始数据的顺序是相同的。
其实,收集到List和forEachOrdered操作很相似,尤其是在数据的分割和并行处理阶段,主要的区别在于collect操作需要在最终合并收集数据
,而forEachOrdered则不需要:
每个并行执行的任务在完成处理后,会将其结果存储到一个临时数据结构中,Fork/Join框架会利用Spliterator提供的区段顺序信息,引导这些临时结果按顺序的合并。同样,即使某个逻辑上靠后的数据段先处理完成,合并时也不会让这个结果前置,整个合并过程递归进行,直至所有的结果都合并完毕。
我们可以自定义一个收集器,来实现toList方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package top.caituotuo.intermediate.streamDemo;
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.stream.Collector; import java.util.stream.Collectors;
public class ParallelStreamsDemo2 { public static void main(String[] args) { ArrayList<String> arrayList = new ArrayList<>(); Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");
List<String> collect = arrayList.parallelStream() .map(String::toLowerCase) .collect(Collector.of( () -> { System.out.println("Supplier(new ArrayList): " + " Thead: " + Thread.currentThread().getName()); return new ArrayList<>(); }, (ArrayList<String> list, String item) -> { System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName()); list.add(item); }, (ArrayList<String> left, ArrayList<String> right) -> { System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName()); left.addAll(right); return left; }, Collector.Characteristics.IDENTITY_FINISH )); System.out.println(collect); } }
|
当我们将toList操作改成toSet操作,结果就会呈现无序,并且还有做去重操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package top.caituotuo.intermediate.streamDemo;
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collector; import java.util.stream.Collectors;
public class ParallelStreamsDemo3 { public static void main(String[] args) { ArrayList<String> arrayList = new ArrayList<>(); Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");
Set<String> collect = arrayList.parallelStream() .map(String::toLowerCase) .collect(Collectors.toSet()); System.out.println(collect); } }
|
这是因为Set数据结构的性质导致的,而不是并行处理本身导致的。
需要强调的是,无论采用toList还是toSet收集数据,整个处理过程,包括Spliterator的分割策略,Fork/Join框架的执行和合并策略都没变。因为这些策略是由数据源的性质决定的,而数据源是同一个List,唯一不同的是合并前存储局部结果的临时数据结构
,一个是List,一个是Set。
同样我们也来自定义toSet的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package top.caituotuo.intermediate.streamDemo;
import java.util.*; import java.util.stream.Collector; import java.util.stream.Collectors;
public class ParallelStreamsDemo3 { public static void main(String[] args) { ArrayList<String> arrayList = new ArrayList<>(); Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");
Set<String> collect = arrayList.parallelStream() .map(String::toLowerCase) .collect(Collector.of( () -> { System.out.println("Supplier(new HashSet): " + " Thead: " + Thread.currentThread().getName()); return new HashSet<>(); }, (HashSet<String> list, String item) -> { System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName()); list.add(item); }, (HashSet<String> left, HashSet<String> right) -> { System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName()); left.addAll(right); return left; }, Collector.Characteristics.IDENTITY_FINISH )); System.out.println(collect); } }
|
Characteristics
UNORDERED
另外,关于Characteristics中的UNORDERED和CONCURRENT两个特性也经常会引起一些误解和混淆。
特别是UNORDERED,许多人可能会误以为一旦设置了UNORDERED,处理结果就会呈现无序状态,然而结果往往还是有序的:
1
| Collector.Characteristics.UNORDERED
|
可以看到,即便加了UNORDERED,结果仍然是顺序的。
这是因为即便Collector被标记为UNORDERED,如果数据源或流操作本身是有序的,系统的执行策略通常仍会保持这些元素的出现顺序,只有在特定场景下,系统才会针对那些被标记为UNORDERED的流进行优化,从而打破约束。
CONCURRENT
在标准的并行流处理中,每个线程处理数据的一个子集维护自己的局部结果容器,在所有的结果处理完成后,这些局部结果会用过一个Combiner的函数合并成一个最终结果。
使用CONCURRENT特性后,所有线程将共享一个结果容器,而不是维护独立的局部结果,从而减少了合并的需要,这通常会带来性能上的提升,特别是当结果容器较大,或合并操作较为复杂时。这也就意味着供应函数只会被调用一次,只创建一个结果容器,而且这个容器必须是线程安全的,例如ConcurrentHashMap,此外合并函数将不会再执行。
由于ArrayList不是线程安全的,即使被标记为CONCURRENT,也不会生效,所以我们将数据源换成线程安全的ConcurrentHashMap:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package top.caituotuo.intermediate.streamDemo;
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collector;
public class ParallelStreamsDemo4 { public static void main(String[] args) { ArrayList<String> arrayList = new ArrayList<>(); Collections.addAll(arrayList, "C", "S", "C", "T", "T", "A");
ConcurrentHashMap<String, String> collect = arrayList.parallelStream() .map(String::toLowerCase) .collect(Collector.of( () -> { System.out.println("Supplier(new ConcurrentHashMap): " + " Thead: " + Thread.currentThread().getName()); return new ConcurrentHashMap<>(); }, (ConcurrentHashMap<String, String> map, String item) -> { System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName()); map.put(item.toUpperCase(), item.toLowerCase()); }, (ConcurrentHashMap<String, String> left, ConcurrentHashMap<String, String> right) -> { System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName()); left.putAll(right); return left; }, Collector.Characteristics.IDENTITY_FINISH, Collector.Characteristics.CONCURRENT )); System.out.println(collect); } }
|
运行结果:
运行后发现Supplier函数仍被多次调用,依旧会按照默认策略在多个线程中创建ConcurrentHashMap实例,这是因为在处理有序流的情况下,如果多个线程并发更新同一个共享的累加器容器,那么元素更新的顺序将变得不确定,为了避免这种情况,框架通常会忽略有序源的CONCURRENT特性,除非同时还指定了UNORDERED特性。
1 2 3
| Collector.Characteristics.IDENTITY_FINISH, Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED
|
运行结果:
我们可以看到Supplier只执行了一次,只new了一个ConcurrentHashMap,Combiner就没有执行,也就是说这个时候CONCURRENT特性生效了。
我们再把UNORDERED删除,把数据源List改成Set:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package top.caituotuo.intermediate.streamDemo;
import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collector;
public class ParallelStreamsDemo5 { public static void main(String[] args) { HashSet<String> set = new HashSet<>(); Collections.addAll(set, "C", "S", "C", "T", "T", "A");
ConcurrentHashMap<String, String> collect = set.parallelStream() .map(String::toLowerCase) .collect(Collector.of( () -> { System.out.println("Supplier(new ConcurrentHashMap): " + " Thead: " + Thread.currentThread().getName()); return new ConcurrentHashMap<>(); }, (ConcurrentHashMap<String, String> map, String item) -> { System.out.println("Accumulator(add): " + item + " Thead: " + Thread.currentThread().getName()); map.put(item.toUpperCase(), item.toLowerCase()); }, (ConcurrentHashMap<String, String> left, ConcurrentHashMap<String, String> right) -> { System.out.println("Combiner(addAll): " + left + "+" + right + " Thead: " + Thread.currentThread().getName()); left.putAll(right); return left; }, Collector.Characteristics.IDENTITY_FINISH, Collector.Characteristics.CONCURRENT )); System.out.println(collect); } }
|
运行结果:
结果也是只实例化了一个容器,也就是说CONCURRENT生效了,对于无序数据源不需要加UNORDERED。
并行流和顺序流的一致性问题
通过以上示例,我们不难发现,并行流和顺序流的API基本都是通用的。
但是,由于顺序流是单线程的,并行流是多线程的
,对于多线程操作,很多时候我们会担心是否会导致与顺序流在结果上的不一致,以及哪些情况可能会引起这种差异。
实际上,对于并行流,通过系统内部精确的执行策略,绝大数的终端操作都能产生与顺序流一致的结果。比如count、min、max、sum、average这些聚合操作并不依赖于元素的出现顺序,只需要将各个子任务的计算结果合并最终结果,系统的策略是先分片计算再合并计算;还有比如anyMatch、allMatch、noneMatch、findFirst、findAny这些短路操作,不仅能够保证结果的一致性,而且在某些情况下还显著提高了执行效率,当一个子任务发现另一个子任务已满足或不满足给定条件时,它将立即终止处理,返回结果,同时通知其它尚未完成的子任务停止执行;即便是涉及distinct、sorted这两个有状态的中间操作,也不影响最终结果的一致性,系统会对每个分片的任务结果进行单独排序或去重,然后在合并结果的过程中再次进行排序或去重处理,以此类推,在进行多次合并处理,完成最终的排序或去重,不过这种方式也会带来额外的性能开销。
然而,并非所有的操作都能保持一致的结果,比如forEach和某些形式的reduce,reduce操作结果是否一致取决于操作是否关联,比如以下栗子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package top.caituotuo.intermediate.streamDemo.ParallelStreamsDemo;
import java.util.stream.Stream;
public class ParallelStreamDemo6 { public static void main(String[] args) { Integer sum1 = Stream.of(1, 2, 3, 4, 5).parallel().reduce(0, (a, b) -> a + b); Integer sum2 = Stream.of(1, 2, 3, 4, 5).reduce(0, Integer::sum); System.out.println(sum1); System.out.println(sum2);
Integer sub1 = Stream.of(1, 2, 3, 4, 5).parallel().reduce(0, (a, b) -> a - b); Integer sub2 = Stream.of(1, 2, 3, 4, 5).reduce(0, (a, b) -> b - a); System.out.println(sub1); System.out.println(sub2); } }
|
总而言之,并行流能够充分发挥多核处理的优势,特别处理大数据量和计算密集型任务,然而,对于数据量规模较小或涉及IO操作的情况,顺序流可能会更合适,这是因为并行处理涉及线程管理和协调的额外开销,这些开销可能会抵消甚至超过了并行执行带来的性能提升,所以在是否使用并行流之前,应该先评估任务的性质、数据的规模以及预期的性能收益,以做出最合适的选择。