【问题标题】:Benefit of using forEachOrdered with Parallel streams将 forEachOrdered 与并行流一起使用的好处
【发布时间】:2018-04-30 09:56:55
【问题描述】:

Oracle 官方文档说:

请注意,如果您使用 像 forEachOrdered 这样的操作与并行流。 Oracle - Parallelism

如果我们失去了并行性,为什么会有人将forEachOrdered 与并行流一起使用?

【问题讨论】:

    标签: java java-8


    【解决方案1】:

    根据情况,使用ForEachOrdered 不会失去所有并行性的好处。

    假设我们有这样的东西:

    stringList.parallelStream().map(String::toUpperCase)
                               .forEachOrdered(System.out::println);
    

    在这种情况下,我们可以保证ForEachOrdered终端操作会按照遇到顺序打印出大写字符串但是我们不应该假设元素会被传递给@987654324 @ 中间操作以它们被挑选进行处理的相同顺序进行。 map 操作将由多个线程同时执行。因此,人们可能仍会从并行性中受益,但这只是我们没有充分利用并行性的潜力。总而言之,我们应该使用ForEachOrdered,以便按照流的遭遇顺序执行操作。

    根据您的评论进行编辑:

    当您跳过map 操作时会发生什么?我更感兴趣 forEachOrdered 紧跟在 parallelStream() 之后

    如果您指的是以下内容:

     stringList.parallelStream().forEachOrdered(action);
    

    这样做没有任何好处,我怀疑这就是设计者在决定创建该方法时的想法。在这种情况下,这样做会更有意义:

    stringList.stream().forEach(action);
    

    扩展您的问题“如果我们失去并行性,为什么会有人将 forEachOrdered 与并行流一起使用”,假设您想针对流遇到顺序对每个元素执行操作;在这种情况下您将需要使用 forEachOrdered,因为forEach 终端操作在并行使用时是非确定性,因此顺序有一个版本/em> 个流,一个专门用于并行个流。

    【讨论】:

    • 跳过地图操作会发生什么?我对parallelStream()之后的forEachOrdered 更感兴趣
    • @Kaunteya 变化不大,流都是由终端操作驱动的
    • 一个更有趣的例子是stringList.parallelStream().sorted().forEachOrdered(action)
    • @Eugene:对于有序流,并行工作人员必须在消费者处等待,直到遇到顺序中较早出现的所有元素都被消耗完,因此 map 步骤的并行处理(以及所有其他紧接在前面的无状态中间操作)非常有限。相反,sorted 步骤将并行执行整个流的排序,而不受后续forEachOrdered 的约束影响。这也适用于排序前的中间步骤,因为这些步骤也可以不受限制地同时运行。
    • @Kaunteya 可以使用stream().forEachOrdered(action),但理想情况下,最好使用stream().forEach(action),因为前者用于并行流。我可能错了,但我怀疑做stringList.parallelStream().forEachOrdered(action); 实际上会启动多个线程。 java doc 声明parallelStream() 返回一个可能 以这个集合作为源的并行流。允许此方法返回顺序流。。所以我在想stringList.parallelStream().forEachOrdered(action); 只会在一个线程中执行。
    【解决方案2】:

    我真的不明白这里的问题。 为什么? 因为你别无选择 - 你有这么多数据,并行流可以帮助你(这仍然需要证明);但是您仍然需要保留订单-因此forEachOrdered。请注意,文档说可能而不是肯定会丢失 - 您必须测量并查看。

    【讨论】:

      【解决方案3】:

      我发现 stream().forEachOrdered() 比它的并行版本快约 50%。再加上并行线程至少使用了来自公共 fork-join 线程池的一个线程,即 - 为在 JVM 中运行的其他并行流少了一个线程。

      public static void main(String[] args) { long start = System.currentTimeMillis(); IntStream.range(0,10000000).parallel().forEachOrdered(i -> { //System.out.println(Thread.currentThread().getName()); int p = 1 * 1 ; }); System.out.println((System.currentTimeMillis() - start)); }

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2013-01-02
        • 2012-03-16
        • 2021-12-25
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多