【问题标题】:Stop ExecutorService on thread failure and exception handling在线程失败和异常处理时停止 ExecutorService
【发布时间】:2021-11-09 12:09:49
【问题描述】:

这是我为暴露我的问题所做的一个简化示例。我有一些任务doSomeWork(),我使用 ExecutorService 以多线程方式处理(一次最多 4 个线程)。但是,如果任何线程/任务产生异常,我想:

  1. 停止处理任何进一步的任务。

  2. 在主线程级别捕获异常。

    public static void main(String[] args) {
        final ExecutorService threadPool = Executors.newFixedThreadPool(4);
        final ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<>(threadPool);
    
        try {
            for (int i = 0; i < 10; i++) {
                int b = i;
                    completionService.submit(() -> doSomeWork(b));
            }
    
            threadPool.shutdown();
            threadPool.awaitTermination(8, TimeUnit.HOURS);
    
            System.exit(0);
    
        } catch (Exception e) {
            System.out.println("Something wrong happened: " + e.getMessage());
        }
    
        System.exit(1);
    
    }
    
    //This function have 50% odds of throwing an exception
    public static Void doSomeWork(int i) throws Exception {
    
        Thread.sleep(500);
        if ((Math.random() > 0.5))
        {
            System.out.println("I have reached indice: " + i);
        }
        else
        {
            throw new Exception("I couldn't handle indice " + i);
        }
        return null;
    }
    

目前,执行会输出如下内容:

I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
I have reached indice: 4
I have reached indice: 6
I have reached indice: 7
I have reached indice: 5
I have reached indice: 9

如您所见,indice 3 丢失了,但剩余线程的执行已完成。它也没有输出任何关于异常的信息。

我想要的输出是这样的:

I have reached indice: 0
I have reached indice: 2
I have reached indice: 1
Something wrong happened: I couldn't handle indice 3

我围绕这个问题找到的其他解决方案是使用带有未来但以阻塞方式的可调用对象。我不能在等待未来的同时阻止其他线程的执行,否则整个多线程是没有意义的。

【问题讨论】:

  • 请在发帖前检查您的问题,并始终添加所有相关标签。
  • 你提交给线程池的任务被封装成FutureTask,执行过程中出现的任何异常都会被FutureTask#run方法捕捉到,所以需要要么把异常捕捉到里面任务或从返回的 Future#get 中获取它。
  • "我不能在等待未来时阻止其他线程的执行......" 使用 future.get 不会阻止其他线程的执行。它可以以与您使用“awaitTermination”相同的方式使用。它会在其他线程正在执行时阻塞主线程。

标签: java multithreading exception


【解决方案1】:

您可以使用CompletableFuture 做到这一点。这是我测试的主要功能:

final ExecutorService executorService = Executors.newFixedThreadPool(4);
final List<CompletableFuture<Void>> all = new ArrayList<>();

try {
    for (int i = 0; i < 10; i++) {
        int b = i;
        CompletableFuture<Void> v = CompletableFuture.runAsync(() -> {
                    try {
                        doSomeWork(b);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                },
                executorService);
        all.add(v);
    }

    CompletableFuture<Void> placeholder = CompletableFuture.allOf(all.toArray(new CompletableFuture[0]));
    failFast(all, placeholder);

    System.out.println("All tasks ended");

} catch (Exception e) {
    System.out.println("Something wrong happened: " + e.getMessage());
} finally {
    executorService.shutdownNow();
}

一旦其中一个失败(或全部完成),使联合未来失败的实用功能:

private static <T> void failFast(List<CompletableFuture<T>> futures, CompletableFuture<T> joint) {
    while (true) {
        if (joint.isDone()) {
            return;
        }
        for (CompletableFuture<T> future : futures) {
            if (future.isCompletedExceptionally()) {
                return;
            }
        }
    }
}

这是我得到的输出:

I have reached indice: 1
I have reached indice: 7
I have reached indice: 5
I have reached indice: 4
Something wrong happened: java.lang.RuntimeException: java.lang.Exception: I couldn't handle indice 0

解释:

CompletableFuture.runAsync() 方法允许您提供Runnable(您的doSomeWork)和具有一定数量线程的执行程序。在这里,我传递了一个有 4 个线程的执行程序(就像您在示例中所做的那样)。

在 runnable 内部,我不仅运行 doSomeWork 函数,而且还捕获 Exception 并抛出 RuntimeException(需要这样做,因为 Lambda 不支持检查异常,所以我需要包装它进入运行时,但它仍会中断执行并被您的主程序捕获)。

每次我为具有给定索引i 的任务创建新的CompletableFuture&lt;Void&gt; 时,我都会将此结果存储到可完成期货的列表中。

for 循环不会执行任何操作,因为可完成的期货是异步运行的。

因此,我与CompletableFuture.allOf(...) 创建了一个联合可完成的未来,然后我在这个未来上使用实用函数failFast,以便在其中一项任务失败时立即停止(或继续直到所有任务都完成)完成)。

因此,基本上只要其中一个期货未能引发异常,联合期货就被认为已完成,因此会将句柄留给您的主线程,与此同时,被抛出的 RuntimeException 击中在 lambda 表达式中。

注意:感谢 Thomas 的评论,我已更新代码以使用 ExecutorService 而不是简单的 Executor。这使您可以在捕获异常后在 finally 块内调用 shutdownNow()。 同样,正如 Thomas 所建议的,您可以直接在 doSomeWork 函数中抛出 RuntimeException,这样您就不需要在 lambda 表达式中捕获和包装。

其他说明: @matt 让我注意到了一些我不知道的事情。 .allOf() 未来将在 ALL 未来完成时完成,无论成功与否。 因此,正如他所指出的,我的解决方案不会按原样工作。我再次编辑了答案以考虑他的评论,感谢@matt 让我注意到。

【讨论】:

  • 我完全忘记了CompletableFuture,为此+1。不过,我要添加两件事:1) executor 应该是 ExecutorService 类型,以便您可以在 finally 块中调用 shutdownNow()。否则它将使应用程序保持活动状态。 2) 如果可能,OP 可以直接抛出 RuntimeException,这将简化 lambda - 应该指出这一点。
  • @Thomas 这些确实是好话,谢谢。如果您愿意,请随时编辑答案,因此我会在午休后添加它们:)
  • 我会让你改变你的风格。不想弄乱你的答案:)
  • @matt 我这样做是因为 lambda 不能抛出检查异常,这是 lambda 的一个已知限制。因此,无论在 lambda 中抛出什么,都必须包装到运行时异常(或其子异常)中,以便从 lambda 本身流出。不幸的是,没有其他方法可以在 lambda 表达式中引发异常,但它仍然是可以捕获和处理的异常。
  • @matt allOf 未来是一个未来,当所有这些都成功时,它就会成功,而一旦其中一个失败,它就会失败。如果你里面有10个future,其中一个异常完成,则不需要等待其他future——allOf已经确定异常完成了。
【解决方案2】:

听起来您基于不正确的假设排除了执行此操作的正确方法。保持你的未来。

List<Future<?>> futures = new ArrayList<>();

然后当你提交时。

futures.add( completionService.submit( () -> doSomeWork(b) ) );

现在,您可以在主线程中检查期货状态。

for(Future<?> f: futures){
    try{
        f.get();
    } catch( ExecutionException e){
        //execution exception handled on the main thread.
        completionService.shutdownNow();
    } catch( InterruptedException ie){
        //what should happen here.
    }
}

这样,shutdownNow 会被调用,因此所有未启动的任务都会返回并且不会启动。

您可以使用get 的超时来检查每个任务,因为有些任务将并行运行。

这是一个完整的可编译示例。

import java.util.concurrent.*;
import java.util.*;

public class ExecutorJunk{
    static int count = 0;
    static void task(){
        int z = count++;
        
        if(z == 3){
            throw new RuntimeException("z is 3");
        }
        
        System.out.println("z: " + z);
        try{ Thread.sleep(1500);} catch(InterruptedException e){};
    }
    
    public static void main(String[] args){
        ExecutorService service = Executors.newFixedThreadPool(4);
        
        List<Future<?>> all = new ArrayList<>();
        
        for(int i = 0; i<10; i++){
            all.add( service.submit(ExecutorJunk::task) );
        }
        service.shutdown();
        
        try{
            while(!service.isTerminated()){
                for(Future f: all){
                    try{
                      f.get(1, TimeUnit.MILLISECONDS);
                    } catch( TimeoutException toe){
                      //pass.
                    }
                }
            }
        } catch(Exception e){
            System.out.println( service.shutdownNow().size() + " tasks not started");
            e.printStackTrace();
        } 
    }
}

当我运行它时,我得到了。

z: 0
z: 1
z: 2
z: 4
5 个任务未开始
java.util.concurrent.ExecutionException: java.lang.RuntimeException: z 是 3
...

它可能会做得更聪明一点。比如在获取成功时清除期货列表,而不是使用超时,只需检查期货是否已完成,然后执行 future.get。

【讨论】:

  • 这样做的问题是您使用 .get() 同步等待每个结果。这意味着您可能正在等待任务 X 的结果,而任务 X+4 已经失败,并且在您向前推进列表中的 4 个元素之前您不会看到它。
  • 感谢您的建议。我可能会做出不正确的假设。但是,我无法理解您在此处提出的 sn-ps 的工作流程。 3 sn-p 究竟如何与其余代码相匹配?我的意思是在完成所有线程的执行之前,您如何能够定期检查“未来”数组?第三个 sn-p 不应该坐在自己的线程中以使其正常工作吗?如果您能在此处提供完整的图片,那将非常有帮助。
  • @Mit94 是的,第三个 sn-p 将坐在它的“自己的线程”上,它恰好是主线程。它代替了您的“completionService.awaitTermination”。
  • @matt 抱歉,我错过了。但是,即使您定义了超时,您仍然依赖于对每个未来的顺序检查,而想法是让它们并行运行(仅限制每个执行程序的线程数)并在发生故障时立即通知这样所有其他待处理的任务都可以关闭
  • @MatteoNNZ allOf 执行什么样的检查?如果它正在执行检查,则应记录在案。
猜你喜欢
  • 2015-07-18
  • 1970-01-01
  • 2016-08-31
  • 1970-01-01
  • 2011-08-22
  • 2013-01-10
  • 1970-01-01
  • 1970-01-01
  • 2021-03-08
相关资源
最近更新 更多