【发布时间】:2017-03-28 04:10:09
【问题描述】:
Java 8 并行流如何处理消费子句中抛出的异常,例如在forEach 处理中?例如以下代码:
final AtomicBoolean throwException = new AtomicBoolean(true);
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
// Throw only on one of the threads.
if (throwException.compareAndSet(true, false)) {
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
});
它会立即停止处理的元素吗?它是否等待已经开始的元素完成?它是否等待所有流完成?抛出异常后是否开始处理流元素?
什么时候回来?异常后立即?在所有/部分元素都由消费者处理之后?
在并行流引发异常后是否继续处理元素? (找到一个发生这种情况的案例)。
这里有一般规则吗?
编辑(15-11-2016)
尝试判断并行流是否提前返回,发现不确定:
@Test
public void testParallelStreamWithException() {
AtomicInteger overallCount = new AtomicInteger(0);
AtomicInteger afterExceptionCount = new AtomicInteger(0);
AtomicBoolean throwException = new AtomicBoolean(true);
try {
IntStream.range(0, 1000)
.parallel()
.forEach(i -> {
overallCount.incrementAndGet();
afterExceptionCount.incrementAndGet();
try {
System.out.println(i + " Sleeping...");
Thread.sleep(1000);
System.out.println(i + " After Sleeping.");
}
catch (InterruptedException e) {
e.printStackTrace();
}
// Throw only on one of the threads and not on main thread.
if (!Thread.currentThread().getName().equals("main") && throwException.compareAndSet(true, false)) {
System.out.println("Throwing exception - " + i);
throw new RuntimeException("One of the tasks threw an exception. Index: " + i);
}
});
Assert.fail("Should not get here.");
}
catch (Exception e) {
System.out.println("Cought Exception. Resetting the afterExceptionCount to zero - 0.");
afterExceptionCount.set(0);
}
System.out.println("Overall count: " + overallCount.get());
System.out.println("After exception count: " + afterExceptionCount.get());
}
延迟返回,不是从主线程抛出。这导致在抛出异常后需要处理许多 new 元素。在我的机器上,抛出异常后处理了大约 200 个元素。但是,并非所有 1000 个元素都被处理。那么这里的规则是什么?为什么抛出异常却处理了更多的元素?
提前返回,去掉not (!)符号,导致主线程抛出异常。只有已经开始的元素完成了处理,没有新的元素被处理。早点回来就是这里的情况。与之前的行为不一致。
我在这里错过了什么?
【问题讨论】:
标签: exception exception-handling parallel-processing java-8 java-stream