【问题标题】:Exception propagation in Java parallel streamsJava 并行流中的异常传播
【发布时间】:2019-08-17 19:29:53
【问题描述】:

在 Akka in Action 书中它说

例外是 几乎不可能在开箱即用的线程之间共享,除非你准备好了 建立大量的基础设施来处理这个问题。

据我所知,如果并行线程中发生异常,它将传播给调用者。如果这种机制是可能的,为什么不使用常规线程来实现呢?我错过了什么吗?

编辑: 我说的是这样的可能性:

public static void count() {
    long count = 0;
    try {
        count = IntStream.range(1, 10)
                .parallel()
                .filter(number -> f(number)).count();
    } catch(RuntimeException e) {
        /* handle */
    }
    System.out.println("Count - " + count);
}

public static boolean f(final int number) {
    if(Math.random() < 0.1) {
        throw new RuntimeException();
    }
    return true;
}

parallel() 产生多个线程,当其中任何一个引发 RuntimeException 时,该异常仍会在主线程上捕获,这似乎与书籍的观点相反。

编辑2:

【问题讨论】:

  • 您能否添加一个特定的代码示例来展示您的问题?我会说caller 通常在同一个线程中。
  • @second 我尝试使用示例进行更新,在这种情况下调用者是主线程,而工作线程抛出异常。但是异常应该是线程本地的,我错了吗?
  • 在您的示例中,异常被ForkJoinTask 捕获(因为parallel() 使用相关的ForkJoinPool),然后将异常报告给调用者。所以我会说在这种情况下有一个机制是地方。
  • 另外,您在书中的陈述似乎有点断章取义。作者似乎在谈论不同不相关ThreadGroups中发生的异常,例如JVM 在不同的机器上运行。
  • @second 他还谈到了不同线程组中的异常,但在此之前他给出了一个应用程序在单机上运行的示例。我在另一个编辑中附加了它的方案。

标签: java multithreading exception


【解决方案1】:

主要区别在于,虽然单个 Stream 中间体可以并行运行,但它们仅在遇到终端操作时才被评估;这使它成为一个虚拟连接点。

也就是说,类似的东西也可以

try {
    Thread concurrent = new Thread(runnable);
    concurrent.start();
    concurrent.join();
} catch (ExceptionThrownInThread ex) {}

但是,在一般情况下 - 这几乎是 Akka 的编程模型 - 你有

yourMessenger.registerCallbacks(callbacks);
new Thread(yourMessenger).start(); 

现在,回调最终将在您创建的线程中被调用,但没有结构可以将其作为一个整体执行;那么谁会捕捉到这个异常呢?

我对Akka了解不够,但是在projectreactor的Publishers中,可以注册一个错误处理程序,如

Mono<Result> mono = somethread.createResult().onError(errorHandler);

但同样,在一般情况下,这并不是微不足道的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-03-18
    • 1970-01-01
    • 2012-06-28
    • 2015-12-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多