【问题标题】:Interrupt all threads if exception occurs in any of the thread among the thread list如果线程列表中的任何一个线程发生异常,则中断所有线程
【发布时间】:2016-10-19 03:45:05
【问题描述】:

我正在使用invokeAll() 调用线程列表。 AFAIK invokeAll() 只会在所有线程完成其任务时返回。

ExecutorService threadExecutor = Executors.newFixedThreadPool(getThreadSize());
List<Future<Object>> future = w_threadExecutor.invokeAll(threadList);

当所有线程完成时调用它

for (Future<Object> w_inProgressThread : w_future)
{
//

它停止发生异常的线程,而不是剩余的线程。 如果任何线程抛出异常,有没有办法停止所有其他线程? 还是我必须提交每个任务而不是 invokeAll()??

我尝试在 invokeAll() 上使用 invokeAny() 但没有取消剩余任务 invokeAny() :如果其中一项任务完成(或引发异常),则取消其余的 Callable。 参考:http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

更新:

CompletionService<Object> completionService = new ExecutorCompletionService<Object>(w_threadExecutor);
                List<Future<Object>> futures = new ArrayList<Future<Object>>();
                for(Thread w_mt : threadList)
                {
                 futures.add(completionService.submit(w_mt));
                }
                for (int numTaken = 0; numTaken < futures.size(); numTaken++) {
                    Future f = completionService.take();
                    try {
                      Object result = f.get();
                      System.out.println(result);  // do something with the normal result
                    } catch (Exception e) {
                      System.out.println("Catched ExecutionException, shutdown now!");
                      //threadExecutor.shutdownNow();
                      Thread.currentThread().interrupt();

                      for (Future<Object> inProgressThread : futures)
                         {
                             inProgressThread.cancel(true);
                         } 
                      break;
                    }

更新 1:

按照 waltersu 的建议,我试过了

ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
              CompletionService<Object> completionService = new ExecutorCompletionService<Object>(threadExecutor);
              List<Future<Object>> futures = new ArrayList<Future<Object>>();
              futures.add(completionService.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    String s=null;
                //  Thread.sleep(1000);
                  for(int i=0; i < 1000000; i++){
                        int j =10 ;
                        if(i==100)
                        {

                        s.toString();
                        }

                        System.out.println("dazfczdsa :: " + i);
                    }
                  //throw new Exception("This is an expected Exception");
                return s;
                }
              }));
              futures.add(completionService.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    for(int i=0; i < 1000000; i++){
                        int j =0 ;
                        j= j+2;
                        System.out.println("dasa :: " + i);
                    }
                  Thread.sleep(3000);

                  return "My First Result";
                }
              }));

              while (futures.size() > 0) {
                Future f = completionService.take();
                futures.remove(f);
                try {
                  Object result = f.get();
                  System.out.println(result);  // do something with the normal result
                } catch (ExecutionException e) {
                  System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!");
                  f.cancel(true);
                  threadExecutor.shutdownNow();
                  break;
                }
              }
              System.out.println("Main exists");

发生异常时不会停止

【问题讨论】:

  • threadExecutor.notifyAll() 中断所有线程
  • @AkashLodha notifyAll() from Object "唤醒所有在这个对象的监视器上等待的线程"。线程没有在任何特定对象的监视器上等待,是吗?我错过了什么?
  • 我假设 threadList 是想要锁定同一对象的线程。
  • 如果您打算在一个线程出错时关闭所有工作,您可以future.cancel(true) 任务 - 如果任务正在运行,这将中断线程。
  • 如果发生异常,我无法在父线程中获得控制权。它只有在完成所有线程之后才会出现。

标签: java multithreading threadpool executorservice java.util.concurrent


【解决方案1】:
for(Future<Object> fut : future){
    try {
            System.out.println(fut.get());
    } 
    catch (InterruptedException | ExecutionException e) {
        Thread.currentThread().interrupt();
        // or notify all here
    } 

【讨论】:

  • 如果一个任务异常退出,invokeAll仍然等待所有任务完成。
  • ya.. 我已经改变了答案
【解决方案2】:

您可以调用 Wrappers,它获取您的工作可运行文件和对它们将被调用的 Executor 的引用。在异常情况下,您可以在 run() 中立即关闭。

class ExceptionHandlingWrapper implements Runnable{
    private ExecutorService es;
    private Runnable childRunnable;

    // CTOR taking an ExecutorService and a Runnable
    public ExceptionHandlingWrapper ( ExecutorService es, Runnable work ){
         this.es = es;
         this.childRunnable = work;
    }

    @Override public void run(){
        try{
           childRunnable.run();
        }
        catch(Exception ex){
            // Todo: LOG IT!
            es.shutdownNow();
        }
    }
}

这当然也适用于 Callables。

【讨论】:

    【解决方案3】:

    你必须一一submit(),而不是invokeAll(),然后检查Future是否有Exception。 p>

    public static void main(String[] args) throws InterruptedException {
      ExecutorService threadExecutor = Executors.newFixedThreadPool(3);
      CompletionService<Object> completionService = new ExecutorCompletionService<>(threadExecutor);
      List<Future<Object>> futures = new ArrayList<>();
      futures.add(completionService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
          Thread.sleep(1000);
          throw new Exception("This is an expected Exception");
        }
      }));
      futures.add(completionService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
          Thread.sleep(3000);
          return "My First Result";
        }
      }));
    
      while (futures.size() > 0) {
        Future f = completionService.take();
        futures.remove(f);
        try {
          Object result = f.get();
          System.out.println(result);  // do something with the normal result
        } catch (ExecutionException e) {
          System.out.println("Caught exception from one task: " + e.getCause().getMessage() + ". shutdown now!");
          threadExecutor.shutdownNow();
          break;
        }
      }
      System.out.println("Main exists");
    }
    

    更新1:(回答op的更新1问题)

    那是因为你的任务有一个很长的循环,它不检查中断,这使得你的任务不可取消。那你怎么阻止呢?我认为您必须修改其他任务以使其可取消。正如the official doc 所说:

    如果一个线程很长时间没有调用一个抛出 InterruptedException 的方法怎么办?然后它必须定期调用 Thread.interrupted,如果收到中断则返回 true。例如:

    for (int i = 0; i < inputs.length; i++) {
        heavyCrunch(inputs[i]);
        if (Thread.interrupted()) {
            // We've been interrupted: no more crunching.
            return;
        }
    }
    

    如果您不想修改任务并希望它快速停止怎么办?有一种方法可以停止不可取消的线程。它是 Thread.stop()。但是,起初,如果不使用反射,您将无法从线程池中获取线程。此外,根据javadoc,它已被弃用,因为“它本质上是不安全的”。

    因此,最佳做法(我认为)是检查您的任务(或部分代码)中的中断,该中断既不可取消,又需要很长时间才能完成。

    【讨论】:

    • 更新:如果所有任务都成功完成,仍然需要调用threadExecutor.shutdown(),这样应用程序才能存在。因为stackoverflow.com/a/20057584/4493265
    • 请检查我的更新 1:有问题..如果发生异常但不会终止其他线程
    【解决方案4】:

    一些建议:

    1. 删除invokeAll并将所有任务提交给ExecutorService
    2. 在 Future 上从 get() 捕获异常并使用 shutdownshutdownNow,如这些问题中所述:

    How to stop next thread from running in a ScheduledThreadPoolExecutor

    How to forcefully shutdown java ExecutorService

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-07-19
      • 1970-01-01
      • 1970-01-01
      • 2022-07-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-10-05
      相关资源
      最近更新 更多