【问题标题】:ThreadPoolExecutor unexpected errorThreadPoolExecutor 意外错误
【发布时间】:2018-07-18 20:15:43
【问题描述】:

在下面编写一个简单的测试程序,该程序应该并行执行某些任务。每次我们提交 6 个任务并等待完成。然后,又提交了一组任务。

 import java.util.concurrent.*;

public class ThreadExecutorTest {

  public static void main(String... args) {
    ThreadPoolExecutor ex   = new ThreadPoolExecutor(   15, 20, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(5));

    for (int i = 0; i< 200; i++) {
      submitTasks(ex);
    }
    System.out.println("Done");
  }

  private static void submitTasks(ThreadPoolExecutor ex) {
    Future f1 = ex.submit( new SampleTask());
    Future f2 = ex.submit( new SampleTask());
    Future f3 = ex.submit( new SampleTask());
    Future f4 = ex.submit( new SampleTask());
    Future f5 = ex.submit( new SampleTask());
    Future f6 = ex.submit( new SampleTask());

//    System.out.println("Max Pool Size " + ex.getMaximumPoolSize());
    System.out.println("Pool Size " + ex.getPoolSize());
//    System.out.println("Active count " + ex.getActiveCount());
//    System.out.println("Task Count " + ex.getTaskCount());
//    System.out.println("Queue length " + ex.getQueue().size());
//    System.out.println("Queue remainingCapacity " + ((ArrayBlockingQueue)ex.getQueue()).remainingCapacity());

    try {
      f1.get();
    } catch (ExecutionException eex) {
      System.out.println("ExecutionException reported later - " + eex.getMessage());
    }catch(Exception exp){
      System.out.println("Exception reported later - " + exp.getMessage());
    }
    try{
      f2.get();
    }catch(Exception exp){}
    try{
      f3.get();
    }catch(Exception exp){}
    try{
      f4.get();
    }catch(Exception exp){}
    try{
      f5.get();
    }catch(Exception exp){}
    try{
      f6.get();
    }catch(Exception exp){}

  }

  static class SampleTask implements Callable<Void> {
    @Override
    public Void call() throws Exception {
      try {
//        Thread.sleep(300);
      } catch (Exception e) {
        System.out.println("Exception reported");
      }
      return null;
    }
  }
}

但是,生成了以下我无法解释的异常。我假设 ThreadPoolExecutor 配置是正确的,可以随时处理 6 个任务。

Pool Size 6
Pool Size 12
Pool Size 15
Pool Size 16
Pool Size 17
Pool Size 18
Pool Size 19
Pool Size 20
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2328c243 rejected from java.util.concurrent.ThreadPoolExecutor@bebdb06[Running, pool size = 20, active threads = 0, queued tasks = 0, completed tasks = 53]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)

【问题讨论】:

  • 如果你的executor有队列,每一个新任务都会先放入队列。如果您有像示例中那样的突发执行,则队列会在任务出列并执行之前被填满。如果这些是现实模式,您将需要更大的队列。

标签: java exception threadpoolexecutor


【解决方案1】:

ThreadPoolExecutor.execute 有一条评论描述了它在提交新任务时的行为:

    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */

在您的情况下,当您一次提交 6 个任务的批次时,当池的当前大小小于核心大小时,这些提交会立即分派到新的工作线程(参见从 0 到 6 的跳转,和从 6 到 12)。

一旦您超过核心池大小但仍小于最大大小,只要队列未满,任务就会提交到队列,然后异步拉出以在现有工作线程上运行。由于这些任务都是背靠背提交的,因此很有可能所有六个任务都在任何被拉出队列之前被提交;因此,前五个将排队,其余的将进入上述过程的第 3 步:创建一个新的工作线程并立即运行该任务。 (这解释了后来从 15 到 16、16 到 17 等的跳跃。)

最终,这导致线程池拥有最大数量的工作线程,当达到上述过程的第 3 步时(如上一段),Executor 无法创建新的工作线程并拒绝该任务.从本质上讲,即使有可用的工作线程,您也没有给执行程序任何时间来将任务从队列中拉出以在队列过度填充之前对其执行。

【讨论】:

  • 请注意,在提交新任务之前,会先调用 Future get() 来获取任务结果。在这种情况下,在提交下一批 6 个任务之前,Executor 没有要处理的任务。
  • 但是在您提交 6 个任务之前,您不会调用它,其中只有 5 个可以放入您的队列。一旦你的线程池满了,如果你的队列中不能容纳一个传入的任务,它就会被拒绝(即使所有的工作线程都是空闲的!)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-05-27
  • 1970-01-01
  • 2012-10-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多