【发布时间】:2014-08-20 18:56:04
【问题描述】:
上下文
我偶然发现了一个相当烦人的问题:我有一个程序有很多数据源,这些数据源能够流式传输相同类型的元素,我想“映射”程序中的每个可用元素(元素顺序没关系)。
因此,我尝试使用streamOfT = streamOfStreamOfT.reduce(Stream.empty(), Stream::concat); 将Stream<Stream<T>> streamOfStreamOfT; 简化为简单的Stream<T> streamOfT;
由于元素顺序对我来说并不重要,我尝试使用.parallel() 并行化reduce 操作:streamOfT = streamOfStreamOfT.parallel().reduce(Stream.empty(), Stream::concat); 但这会触发java.lang.IllegalStateException: stream has already been operated upon or closed
示例
要亲自体验,只需通过评论/取消评论 .parallel() 来玩以下 main (java 1.8u20)
public static void main(String[] args) {
// GIVEN
List<Stream<Integer>> listOfStreamOfInts = new ArrayList<>();
for (int j = 0; j < 10; j++) {
IntStream intStreamOf10Ints = IntStream.iterate(0, i -> i + 1)
.limit(10);
Stream<Integer> genericStreamOf10Ints = StreamSupport.stream(
intStreamOf10Ints.spliterator(), true);
listOfStreamOfInts.add(genericStreamOf10Ints);
}
Stream<Stream<Integer>> streamOfStreamOfInts = listOfStreamOfInts
.stream();
// WHEN
Stream<Integer> streamOfInts = streamOfStreamOfInts
// ////////////////
// PROBLEM
// |
// V
.parallel()
.reduce(Stream.empty(), Stream::concat);
// THEN
System.out.println(streamOfInts.map(String::valueOf).collect(
joining(", ")));
}
问题
有人能解释一下这个限制吗? / 找到一种更好的方法来处理流的并行减少
编辑 1
在@Smutje 和@LouisWasserman cmets 之后,.flatMap(Function.identity()) 似乎是容忍.parallel() 流的更好选择
【问题讨论】:
-
有一个列表列表,你可以
flatMap它使用流的一个大列表... -
你绝对应该在这里使用
flatMap。 -
@Smutje 好的,
flatMap完成了这项工作,但我仍然不明白reduce对并行化的“反应” -
关于“流顺序对我来说不重要”的部分无关紧要;无论如何,并行归约都会保留顺序(只要您的归约运算符是关联的。)
标签: java parallel-processing java-8 java-stream