【问题标题】:Java 8 Streams interference inside forEach on a field字段上的 forEach 内部的 Java 8 Streams 干扰
【发布时间】:2017-06-16 11:41:55
【问题描述】:

考虑以下使用 java 8 流的愚蠢程序:

private int biggestInt;

private void run() {
    ExecutorService executor = Executors.newWorkStealingPool();

    List<Callable<Integer>> callables = new ArrayList<>();

    for (int i = 0; i<50; i++) {
        callables.add(randomInt());
    }

    try {
        executor.invokeAll(callables)
            .stream()
            .map(future -> {
                    try {
                        return future.get();
                    } catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                })
            .forEach(this::compareBiggestInt);
    } catch (InterruptedException e) { /* do nothing */ }
}

private Callable<Integer> randomInt() {
    return () -> {
        Random random = new Random(System.currentTimeMillis());
        return random.nextInt();
    };
}

private void compareBiggestInt(Integer in) {
    if (in > biggestInt)
        biggestInt = in;
}

我的问题是,forEach(this::compareBiggestInt) 是否并行执行,因此会在最大 Int 上引入竞争条件?

如果是这样,我怎样才能避免这种竞争条件? 例如,我可以像下面这样更改方法吗?

private synchronized void compareBiggestInt(Integer in) {[...]}

感谢任何帮助!

【问题讨论】:

    标签: java concurrency java-8 java-stream


    【解决方案1】:

    不,forEach 不是并行执行的。这将打破forEach 在与stream() 而非parallelStream() 一起使用时的预期行为的一般约定,并且不受您引入ExecutorService 这一事实的影响。

    invokeAll() 实际上返回了Future 实例中的List,这些实例已完成或超时。因此,在您与流交互时,并行部分已经完成。

    【讨论】:

      【解决方案2】:

      这里有一些问题。第一:

      return () -> {
          Random random = new Random(System.currentTimeMillis());
          return random.nextInt();
      };
      

      执行速度可能如此之快(我可以轻松重现),这将始终返回相同的值。

      我建议你至少删除 millis

      private static Callable<Integer> randomInt() {
          return () -> {
              Random random = new Random();
              int x = random.nextInt(100);
              System.out.println(x);
              return x;
          };
      }
      

      或者更好地使用ThreadLocalRandom.current().nextInt(100)

      我还更改了nextInt 以返回到[0.. 100] 的范围内,因为nextInt 可以返回一个负值并假设您返回50 个负值,然后您的最大值将是zero(默认值) biggestInt;这显然是错误的。

      然后你的流是sequential 并且在每个map 操作中你阻塞直到Future.get 完成。因此,您的 forEach 由单个线程执行。

      【讨论】:

      • 感谢您的回答。 Math.random() 呢?我也可以在这种快速执行环境中使用它吗? (我知道这会返回一个介于 0 和 1 之间的双精度值,目前还可以)
      • @Sven793 您可以使用System.nanoTime() 获得纳秒级精度,这将增加获得更好随机数的概率。
      • @Sven793 你的意思是你添加了parallel 并看到流是并行执行的?这几乎是意料之中的。让我重新表述我所说的:添加parallel 会显示不同的最大值而不是顺序流对于相同的数据
      • @Eugene,但请注意,System.nanoTime() 不一定会产生具有实际纳秒精度的图章。我记得有些 Windows 机器没有纳米精度的时钟,所以 nanoTime() 那里返回截断到系统时钟支持的毫精度的标记。
      • 咳咳,别忘了这些Callables 应该并行运行,因为它们被传递给invokeAll。因此,即使使用非常慢的机器和高精度计时器,当使用系统时间作为种子时,它们中的几个也可以产生相同的值。那么为什么不使用ThreadLocalRandom.current().nextInt(100),这将比为每个数字创建本地Random 实例更快(从Java 7 开始)...
      【解决方案3】:

      forEach 不在并行 流中执行。实际执行异步任务的是executorStream#map 操作将等到所有 Futures 完成。

      如果你想要一个操作在并行流中执行,你应该使用reduction operation:Stream#reduce。例如:

      biggestInt = executor.invokeAll(callables)
              .parallelStream()
              .map(...)// same with yours
              .reduce(BinaryOperator.maxBy(Comparator.naturalOrder()))
              .orElse(null);
      

      【讨论】:

      • 嗨,holi-java,你好吗?只是关于措辞的细节...... Stream.reduce 不是可变归约,因为中间结果不是在可变结构中累积的。流的可变归约操作是collect。
      • @FedericoPeraltaSchaffner 你好,我只是复制the documentation link。我认为这是对的。您可以在Collector 类的描述中看到它。
      • 但是您已经链接到包文档的 Reduction 部分,这与下面的 Mutable reduction 部分不同。 reduce归约collect可变归约
      • 我不知道“文档”应该是什么。您已链接到 java.util.stream 包文档,该文档描述了几个不同的概念。 Reduction 是其中之一,它描述了该概念和关联方法reduceMutable reduction 是另一个,在下一节中使用关联方法collect 进行描述. reduce 的方法文档清楚地链接到 Reduction,但是当然,如​​果您向下滚动到下一部分,您将到达 Mutable reduction 但如果您 阅读这两部分,你应该注意到它是新部分
      • 还有一件事,在BinaryOperator 中只有maxBy 需要Comparator,所以操作必须是.reduce(BinaryOperator.maxBy(Comparator.naturalOrder())),但您可以将其简化为Stream API 提供的.max(Comparator.naturalOrder()) ,这将在内部执行相同的操作。
      【解决方案4】:

      您不使用parallel 流,因此您的流是顺序的。如果您想确保您的流按顺序完成,请将 .sequential() 方法添加到您的流中。

      来自docs

      default Stream<E> stream()  
      Returns a sequential Stream with this collection as its source.
      

      【讨论】:

        【解决方案5】:

        假设您正在并行运行流(我将代码更改为使用“parallelStream”),您必须保护对共享可变变量的所有更改。

        例如,在下面的代码中,我在方法“compareBiggestInt”中使用“同步”来保护对变量“biggestInt”的所有访问。 (如果您删除“同步”并运行以下代码,您可以看到方法“compareBiggestInt”中确实存在竞争条件)

        import java.util.ArrayList;
        import java.util.List;
        import java.util.Random;
        import java.util.concurrent.Callable;
        import java.util.concurrent.ExecutorService;
        import java.util.concurrent.Executors;
        
        
        public class ParallelStreamExample  {
        
            private volatile int biggestInt;
        
            public static void main(String[] args) {
                ParallelStreamExample parallelStreamExample = new ParallelStreamExample();
                parallelStreamExample.doTheWork();
            }
        
        
        
            private void doTheWork() {
                ExecutorService executor = Executors.newWorkStealingPool();
        
                List<Callable<Integer>> callables = new ArrayList<>();
        
                for (int i = 0; i < 5; i++) {
                    callables.add(randomInt());
                }
        
                try {
                    executor.invokeAll(callables)
                            .parallelStream()
                            .map(future -> {
                                try {
                                    return future.get();
                                } catch (Exception e) {
                                    throw new IllegalStateException(e);
                                }
                            })
                            .forEach(this::compareBiggestInt);
                } catch (InterruptedException e) { /* do nothing */ }
            }
        
            private Callable<Integer> randomInt() {
                return () -> {
                    Random random = new Random();
                    return random.nextInt(10);
                };
            }
        
            private synchronized void compareBiggestInt(Integer in)  {
                System.out.println("in:" + in + " - current biggestint = " + biggestInt);
                if (in > biggestInt) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    biggestInt = in;
                }
                System.out.println("in:" + in + " - current biggestint = " + biggestInt);
            }
        }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2021-12-15
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多