【问题标题】:How do Java 8 parallel streams behave on a thrown exception?Java 8 并行流如何处理抛出的异常?
【发布时间】:2017-03-28 04:10:09
【问题描述】:

Java 8 并行流如何处理消费子句中抛出的异常,例如在forEach 处理中?例如以下代码:

final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
    .parallel()
    .forEach(i -> {
        // Throw only on one of the threads.
        if (throwException.compareAndSet(true, false)) {
            throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
        });

它会立即停止处理的元素吗?它是否等待已经开始的元素完成?它是否等待所有流完成?抛出异常后是否开始处理流元素?

什么时候回来?异常后立即?在所有/部分元素都由消费者处理之后?

在并行流引发异常后是否继续处理元素? (找到一个发生这种情况的案例)。

这里有一般规则吗?

编辑(15-11-2016)

尝试判断并行流是否提前返回,发现不确定:

@Test
public void testParallelStreamWithException() {
    AtomicInteger overallCount = new AtomicInteger(0);
    AtomicInteger afterExceptionCount = new AtomicInteger(0);
    AtomicBoolean throwException = new AtomicBoolean(true);

    try {
        IntStream.range(0, 1000)
            .parallel()
            .forEach(i -> {
                overallCount.incrementAndGet();
                afterExceptionCount.incrementAndGet();
                try {
                    System.out.println(i + " Sleeping...");
                    Thread.sleep(1000);
                    System.out.println(i + " After Sleeping.");
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // Throw only on one of the threads and not on main thread.
                if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
                    System.out.println("Throwing exception - " + i);
                    throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
                }
            });
        Assert.fail("Should not get here.");
    }
    catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }
    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());
}

延迟返回,不是从主线程抛出。这导致在抛出异常后需要处理许多 new 元素。在我的机器上,抛出异常后处理了大约 200 个元素。但是,并非所有 1000 个元素都被处理。那么这里的规则是什么?为什么抛出异常却处理了更多的元素?

提前返回,去掉not (!)符号,导致主线程抛出异常。只有已经开始的元素完成了处理,没有新的元素被处理。早点回来就是这里的情况。与之前的行为不一致。

我在这里错过了什么?

【问题讨论】:

标签: exception exception-handling parallel-processing java-8 java-stream


【解决方案1】:

当某个阶段抛出异常时,它不等待其他操作完成,异常被重新抛出给调用者。 这就是 ForkJoinPool 的处理方式。

相比之下 findFirst 例如当并行运行时,只有在所有操作完成处理后才会将结果呈现给调用者(即使在需要完成所有操作之前知道结果)。

换句话说:它会提前返回,但会让所有正在运行的任务完成。

编辑以回答最后一条评论

Holger 的回答(cmets 中的链接)很好地解释了这一点,但这里有一些细节。

1) 当杀死所有的主线程时,你也杀死了所有应该由这些线程处理的任务。所以这个数字应该实际上更多的是 250 左右,因为有 1000 个任务和 4 个线程,我假设这返回 3?:

int result = ForkJoinPool.getCommonPoolParallelism();

理论上有 1000 个任务,有 4 个线程,每个线程应该处理 250 个任务,然后你杀死其中 3 个意味着 750 个任务丢失。 剩下 250 个任务要执行,ForkJoinPool 将跨越 3 个新线程来执行这 250 个剩余任务。

您可以尝试一些事情,像这样更改您的流(使流没有大小):

IntStream.generate(random::nextInt).limit(1000).parallel().forEach

这一次,将会有更多的操作结束,因为初始拆分索引是未知的并且是由其他策略选择的。你也可以尝试改变这个:

 if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {

到这里:

 if (!Thread.currentThread().getName().equals("main")) {

这一次你总是会杀死除 main 之外的所有线程,直到某个点,ForkJoinPool 不会创建新线程,因为任务太小而无法拆分,因此不需要其他线程。在这种情况下,完成的任务会更少。

2)您的第二个示例,当您实际杀死主线程时,就像代码一样,您将看不到其他线程的实际运行。改变它:

    } catch (Exception e) {
        System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
        afterExceptionCount.set(0);
    }

    // give some time for other threads to finish their work. You could play commenting and de-commenting this line to see a big difference in results. 
    TimeUnit.SECONDS.sleep(60);

    System.out.println("Overall count: " + overallCount.get());
    System.out.println("After exception count: " + afterExceptionCount.get());

【讨论】:

  • 你能根据一些文档吗?
  • @AlikElzin-kilaka 不是真的,我认为这没有记录。我通过阅读其他一些提到此错误的 SO 问题来记住这一点:bugs.openjdk.java.net/browse/JDK-8164690
  • @AlikElzin-kilaka core-libs-dev 邮件列表上也有this 讨论线程,它导致了 Eugene 提到的 JBS 错误。
  • here 是较旧的参考。
  • 我添加了一个导致并行流无法提前返回的异常的代码示例。
猜你喜欢
  • 1970-01-01
  • 2018-04-29
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-06-26
  • 2019-07-19
  • 2019-03-20
相关资源
最近更新 更多