【问题标题】:Handling exceptions from Java ExecutorService tasks处理来自 Java ExecutorService 任务的异常
【发布时间】:2011-01-15 22:17:56
【问题描述】:

我正在尝试使用 Java 的 ThreadPoolExecutor 类以固定数量的线程运行大量重量级任务。每个任务都有很多地方可能会因为异常而失败。

我已经继承了ThreadPoolExecutor 并且我已经覆盖了afterExecute 方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。但是,我似乎无法让它工作。

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

这个程序的输出是“一切正常——情况正常!”即使提交给线程池的唯一 Runnable 抛出异常。有什么线索知道这里发生了什么吗?

谢谢!

【问题讨论】:

  • 你从来没有问过任务的未来,那里发生了什么。整个服务执行器或程序不会崩溃。异常被捕获并包装在 ExecutionException 下。如果你调用future.get(),他会重新抛出。 PS:future.isDone() [请阅读真正的 api 名称] 将返回 true,即使可运行对象错误地完成。因为任务是真实完成的。

标签: java multithreading exception executorservice threadpoolexecutor


【解决方案1】:

警告:需要注意的是这种解决方案会阻塞调用线程。


如果你想处理任务抛出的异常,那么通常使用Callable而不是Runnable更好。

Callable.call() 被允许抛出检查异常,这些异常被传播回调用线程:

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

如果Callable.call() 抛出异常,这将被包裹在ExecutionException 中并由Future.get() 抛出。

这可能比继承ThreadPoolExecutor 更可取。如果异常是可恢复的,它还使您有机会重新提交任务。

【讨论】:

  • > Callable.call() 允许抛出已检查异常,这些异常会传播回调用线程: 请注意,抛出的异常只会传播到调用线程如果 future.get() 或其重载版本被调用。
  • 很完美,但是如果我并行运行任务并且不想阻塞执行怎么办?
  • 不要使用这个解决方案,因为它破坏了使用 ExecutorService 的整个目的。 ExecutorService 是一种异步执行机制,能够在后台执行任务。如果你在执行后立即调用future.get(),它将阻塞调用线程,直到任务完成。
  • 这个解决方案的评价不应该那么高。 Future.get() 同步工作,并将充当阻止程序,直到 Runnable 或 Callable 被执行,并且如上所述破坏了使用 Executor Service 的目的
  • 正如#nhylated 所指出的,这值得一个jdk BUG。如果未调用 Future.get(),则任何来自 Callable 的未捕获异常都会被静默忽略。非常糟糕的设计.... 只花了 1 天以上的时间来找出一个使用它的库,而 jdk 默默地忽略了异常。而且,这在 jdk12 中仍然存在。
【解决方案2】:

来自docs

注意:当动作包含在 任务(例如 FutureTask) 显式地或通过诸如 提交,这些任务对象捕获并 维护计算异常,以及 所以它们不会导致突然 终止,和内部 异常不会传递给这个 方法。

当您提交 Runnable 时,它​​会被包裹在 Future 中。

你的 afterExecute 应该是这样的:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

【讨论】:

  • 谢谢,我最终使用了这个解决方案。此外,如果有人感兴趣:其他人建议不要继承 ExecutorService,但我还是这样做了,因为我想在任务完成时监控它们,而不是等待所有任务终止,然后在所有返回的 Futures 上调用 get() .
  • 另一种子类化执行器的方法是继承 FutureTask 并覆盖其“完成”方法
  • Tom >> 您能否发布您的示例 sn-p 代码,其中您将 ExecutorService 子类化以在任务完成时监控它们...
  • 如果您使用 ComplableFuture.runAsync,此答案将不起作用,因为 afterExecute 将包含一个包私有的对象,并且无法访问 throwable。我通过结束通话来解决它。请参阅下面的答案。
  • 我们是否必须使用future.isDone()检查未来是否完成?由于afterExecuteRunnable 完成后运行,我假设future.isDone() 总是返回true
【解决方案3】:

这种行为的解释就在javadoc for afterExecute

注意:当动作包含在 任务(例如 FutureTask) 显式地或通过诸如 提交,这些任务对象捕获并 维护计算异常,以及 所以它们不会导致突然 终止,和内部 异常不会传递给这个 方法。

【讨论】:

    【解决方案4】:

    我通过包装提交给执行程序的提供的可运行文件来解决它。

    CompletableFuture.runAsync(() -> {
            try {
                  runnable.run();
            } catch (Throwable e) {
                  Log.info(Concurrency.class, "runAsync", e);
            }
    }, executorService);
    

    【讨论】:

    • 可以使用CompletableFuturewhenComplete()方法提高可读性。
    • @EduardWirch 这行得通,但你不能从 whenComplete() 中抛出异常
    【解决方案5】:

    我正在使用来自jcabi-logVerboseRunnable 类,它会吞下所有异常并记录它们。很方便,例如:

    import com.jcabi.log.VerboseRunnable;
    scheduler.scheduleWithFixedDelay(
      new VerboseRunnable(
        Runnable() {
          public void run() { 
            // the code, which may throw
          }
        },
        true // it means that all exceptions will be swallowed and logged
      ),
      1, 1, TimeUnit.MILLISECONDS
    );
    

    【讨论】:

      【解决方案6】:

      另一个解决方案是使用 ManagedTaskManagedTaskListener

      您需要一个 CallableRunnable 来实现接口 ManagedTask

      方法getManagedTaskListener返回你想要的实例。

      public ManagedTaskListener getManagedTaskListener() {
      

      您在 ManagedTaskListener 中实现 taskDone 方法:

      @Override
      public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
          if (exception != null) {
              LOGGER.log(Level.SEVERE, exception.getMessage());
          }
      }
      

      更多关于managed task lifecycle and listener的细节。

      【讨论】:

        【解决方案7】:

        这行得通

        • 它是从 SingleThreadExecutor 派生的,但你可以很容易地适应它
        • Java 8 lamdas 代码,但易于修复

        它会创建一个单线程的Executor,可以处理很多任务;并将等待当前一个结束执行以开始下一个

        如果发生未捕获的错误或异常,uncaughtExceptionHandler 将捕获它

        公共最终类 SingleThreadExecutorWithExceptions { 公共静态 ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { ThreadFactory 工厂 = (Runnable runnable) -> { 最终线程 newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions"); newThread.setUncaughtExceptionHandler((final Thread cugthThread,final Throwable throwable) -> { uncaughtExceptionHandler.uncaughtException(caughthThread, throwable); }); 返回新线程; }; 返回新的 FinalizableDelegatedExecutorService (新的线程池执行器(1, 1, 0L,时间单位.毫秒, 新的 LinkedBlockingQueue(), 工厂){ protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); if (throwable == null && runnable instanceof Future) { 尝试 { 未来的未来=(未来)可运行; 如果 (future.isDone()) { 未来.get(); } } 捕捉(CancellationException ce){ 可投掷 = ce; } 捕捉(ExecutionException ee){ throwable = ee.getCause(); } 捕捉(InterruptedException 即){ Thread.currentThread().interrupt(); // 忽略/重置 } } if (throwable != null) { uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable); } } }); } 私有静态类 FinalizableDelegatedExecutorService 扩展 DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService 执行者) { 超级(执行者); } 受保护的无效finalize(){ super.shutdown(); } } /** * 仅公开 ExecutorService 方法的包装类 * 的 ExecutorService 实现。 */ 私有静态类 DelegatedExecutorService 扩展 AbstractExecutorService { 私有最终 ExecutorService e; DelegatedExecutorService(ExecutorService 执行者) { e = 执行者; } 公共无效执行(可运行命令){ e.execute(命令); } 公共无效关机(){ e.shutdown(); } 公共列表 shutdownNow() { 返回 e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) 抛出 InterruptedException { 返回 e.awaitTermination(超时,单位); } 公共未来提交(可运行任务){ 返回 e.submit(task); } 公共未来提交(可调用任务){ 返回 e.submit(task); } 公共未来提交(可运行任务,T结果){ 返回 e.submit(任务,结果); } 公共列表>invokeAll(集合>任务) 抛出 InterruptedException { 返回 e.invokeAll(任务); } 公共列表>调用所有(集合>任务, 长超时,TimeUnit 单位) 抛出 InterruptedException { return e.invokeAll(tasks, timeout, unit); } 公共T调用Any(集合>任务) 抛出 InterruptedException, ExecutionException { 返回 e.invokeAny(tasks); } 公共 T 调用任何(集合>任务, 长超时,TimeUnit 单位) 抛出 InterruptedException、ExecutionException、TimeoutException { 返回 e.invokeAny(任务,超时,单位); } } 私有 SingleThreadExecutorWithExceptions() {} }

        【讨论】:

        • 不幸的是,使用 finalize 有点不稳定,因为它只会在“稍后在垃圾收集器收集它时”被调用(或者在线程的情况下可能不会,不知道)......跨度>
        【解决方案8】:

        如果您想监控任务的执行,您可以旋转 1 或 2 个线程(可能更多取决于负载)并使用它们从 ExecutionCompletionService 包装器中获取任务。

        【讨论】:

          【解决方案9】:

          如果您的 ExecutorService 来自外部源(即,不能继承 ThreadPoolExecutor 并覆盖 afterExecute()),您可以使用动态代理来实现所需的行为:

          public static ExecutorService errorAware(final ExecutorService executor) {
              return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                      new Class[] {ExecutorService.class},
                      (proxy, method, args) -> {
                          if (method.getName().equals("submit")) {
                              final Object arg0 = args[0];
                              if (arg0 instanceof Runnable) {
                                  args[0] = new Runnable() {
                                      @Override
                                      public void run() {
                                          final Runnable task = (Runnable) arg0;
                                          try {
                                              task.run();
                                              if (task instanceof Future<?>) {
                                                  final Future<?> future = (Future<?>) task;
          
                                                  if (future.isDone()) {
                                                      try {
                                                          future.get();
                                                      } catch (final CancellationException ce) {
                                                          // Your error-handling code here
                                                          ce.printStackTrace();
                                                      } catch (final ExecutionException ee) {
                                                          // Your error-handling code here
                                                          ee.getCause().printStackTrace();
                                                      } catch (final InterruptedException ie) {
                                                          Thread.currentThread().interrupt();
                                                      }
                                                  }
                                              }
                                          } catch (final RuntimeException re) {
                                              // Your error-handling code here
                                              re.printStackTrace();
                                              throw re;
                                          } catch (final Error e) {
                                              // Your error-handling code here
                                              e.printStackTrace();
                                              throw e;
                                          }
                                      }
                                  };
                              } else if (arg0 instanceof Callable<?>) {
                                  args[0] = new Callable<Object>() {
                                      @Override
                                      public Object call() throws Exception {
                                          final Callable<?> task = (Callable<?>) arg0;
                                          try {
                                              return task.call();
                                          } catch (final Exception e) {
                                              // Your error-handling code here
                                              e.printStackTrace();
                                              throw e;
                                          } catch (final Error e) {
                                              // Your error-handling code here
                                              e.printStackTrace();
                                              throw e;
                                          }
                                      }
                                  };
                              }
                          }
                          return method.invoke(executor, args);
                      });
          }
          

          【讨论】:

            【解决方案10】:

            这是因为AbstractExecutorService :: submit 将您的runnable 包装成RunnableFuture(除了FutureTask),如下所示

            AbstractExecutorService.java
            
            public Future<?> submit(Runnable task) {
                if (task == null) throw new NullPointerException();
                RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
                execute(ftask);
                return ftask;
            }
            

            然后execute 将它传递给WorkerWorker.run() 将调用下面。

            ThreadPoolExecutor.java
            
            final void runWorker(Worker w) {
                Thread wt = Thread.currentThread();
                Runnable task = w.firstTask;
                w.firstTask = null;
                w.unlock(); // allow interrupts
                boolean completedAbruptly = true;
                try {
                    while (task != null || (task = getTask()) != null) {
                        w.lock();
                        // If pool is stopping, ensure thread is interrupted;
                        // if not, ensure thread is not interrupted.  This
                        // requires a recheck in second case to deal with
                        // shutdownNow race while clearing interrupt
                        if ((runStateAtLeast(ctl.get(), STOP) ||
                             (Thread.interrupted() &&
                              runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                            wt.interrupt();
                        try {
                            beforeExecute(wt, task);
                            Throwable thrown = null;
                            try {
                                task.run();           /////////HERE////////
                            } catch (RuntimeException x) {
                                thrown = x; throw x;
                            } catch (Error x) {
                                thrown = x; throw x;
                            } catch (Throwable x) {
                                thrown = x; throw new Error(x);
                            } finally {
                                afterExecute(task, thrown);
                            }
                        } finally {
                            task = null;
                            w.completedTasks++;
                            w.unlock();
                        }
                    }
                    completedAbruptly = false;
                } finally {
                    processWorkerExit(w, completedAbruptly);
                }
            }
            

            最后task.run();在上面的代码调用中会调用 FutureTask.run()。这是异常处理程序代码,因为 这你没有得到预期的异常。

            class FutureTask<V> implements RunnableFuture<V>
            
            public void run() {
                if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                                 null, Thread.currentThread()))
                    return;
                try {
                    Callable<V> c = callable;
                    if (c != null && state == NEW) {
                        V result;
                        boolean ran;
                        try {
                            result = c.call();
                            ran = true;
                        } catch (Throwable ex) {   /////////HERE////////
                            result = null;
                            ran = false;
                            setException(ex);
                        }
                        if (ran)
                            set(result);
                    }
                } finally {
                    // runner must be non-null until state is settled to
                    // prevent concurrent calls to run()
                    runner = null;
                    // state must be re-read after nulling runner to prevent
                    // leaked interrupts
                    int s = state;
                    if (s >= INTERRUPTING)
                        handlePossibleCancellationInterrupt(s);
                }
            }
            

            【讨论】:

              【解决方案11】:

              这类似于 mmm 的解决方案,但更容易理解。让您的任务扩展一个包含 run() 方法的抽象类。

              public abstract Task implements Runnable {
              
                  public abstract void execute();
              
                  public void run() {
                    try {
                      execute();
                    } catch (Throwable t) {
                      // handle it  
                    }
                  }
              }
              
              
              public MySampleTask extends Task {
                  public void execute() {
                      // heavy, error-prone code here
                  }
              }
              

              【讨论】:

                【解决方案12】:

                我会为它提供一个 ThreadFactory 实例来创建新线程并为它们提供一个 UncaughtExceptionHandler,而不是子类化 ThreadPoolExecutor

                【讨论】:

                • 我也试过这个,但似乎从未调用 uncaughtException 方法。我相信这是因为 ThreadPoolExecutor 类中的工作线程正在捕获异常。
                • 未调用 uncaughtException 方法,因为 ExecutorService 的 submit 方法将 Callable/Runnable 包装在 Future 中;异常正在那里被捕获。
                • 如果你使用 execute(): void,而不是 submit():Future,它应该可以工作。
                猜你喜欢
                • 1970-01-01
                • 2018-07-19
                • 1970-01-01
                • 2018-01-15
                • 2015-05-03
                • 1970-01-01
                • 1970-01-01
                • 2016-06-27
                • 1970-01-01
                相关资源
                最近更新 更多