【问题标题】:ThreadPoolExecutor Block When Queue Is Full?队列满时ThreadPoolExecutor阻塞?
【发布时间】:2011-03-27 15:17:44
【问题描述】:

我正在尝试使用 ThreadPoolExecutor 执行大量任务。下面是一个假设的例子:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

问题是我很快就得到了java.util.concurrent.RejectedExecutionException,因为任务数超过了工作队列的大小。但是,我正在寻找的期望行为是让主线程阻塞,直到队列中有空间。实现这一目标的最佳方法是什么?

【问题讨论】:

  • This answer to another question建议使用自定义BlockingQueue子类,通过委托给put()来阻止offer()。我认为最终的工作方式与调用getQueue().put()RejectedExecutionHandler 大致相同。
  • 直接放入队列是不正确的,正如这个答案stackoverflow.com/a/3518588/585903
  • @SumitJain 仔细阅读该答案;该答案中提出的三个反对意见中只有一个适用于@Robert Tupelo-Schneck 评论中建议的方法。通过从队列本身中调用put(),您不会通过getQueue() 访问队列(反对#3),并且如果需要,您放入的对象已经正确包装(反对#2)。如果您的所有线程在项目离开队列之前都死了,您仍然面临死锁的风险,但这可能是大多数寻求此特定解决方案的人愿意承担的风险。

标签: java multithreading concurrency executorservice executor


【解决方案1】:

在一些非常狭窄的情况下,您可以实现一个 java.util.concurrent.RejectedExecutionHandler 来满足您的需要。

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

现在。这是一个非常糟糕的主意,原因如下

  • 它很容易出现死锁,因为池中的所有线程都可能在您放入队列中的东西可见之前就死了。通过设置合理的保持活动时间来缓解此问题。
  • 任务未按照您的 Executor 预期的方式包装。许多执行器实现在执行之前将它们的任务包装在某种跟踪对象中。看看你的来源。
  • API 强烈反对通过 getQueue() 添加,并且在某些时候可能会被禁止。

一个几乎总是更好的策略是安装 ThreadPoolExecutor.CallerRunsPolicy,它将通过在调用 execute() 的线程上运行任务来限制您的应用程序。

但是,有时阻止策略及其所有固有风险确实是您想要的。我会说在这些条件下

  • 只有一个线程调用 execute()
  • 您必须(或希望)有一个非常小的队列长度
  • 您绝对需要限制运行此工作的线程数(通常是出于外部原因),而调用者运行策略会破坏这一点。
  • 您的任务大小无法预测,因此如果池暂时忙于 4 个短任务并且您的一个线程调用 execute 被一个大任务卡住,调用者运行可能会导致饥饿。

所以,正如我所说。它很少需要,而且可能很危险,但你去吧。

祝你好运。

【讨论】:

  • 一个深思熟虑的回应。我对你的情况有一个小问题,即 > “你必须(或想要)有一个非常小的队列长度。”您可能无法预测给定作业将排队的任务数量。也许您正在运行一项处理来自某个数据库的数据的日常工作,周一有 500 条记录要处理,但周二有 50,000 条。你必须在队列上设置一个上限,这样当一个大工作完成时你就不会搞砸你的堆。在这种情况下,等待一些任务完成后再排队是没有害处的。
  • “它很容易出现死锁,因为池中的所有线程都可能在你放入队列中的东西可见之前就死了。通过设置合理的保持活动时间来缓解这种情况。”不能通过将最小池大小设置为大于零的值来完全避免死锁吗?其他所有原因都是 Java 没有内置支持阻塞 put 到 executor 队列的后果。这很有趣,因为这似乎是一个非常合理的策略。我想知道这是什么原理。
  • 也许阻塞策略的另一个条件是执行顺序很重要。 CallerRunsPolicy 将意味着被拒绝的任务可能会在执行程序中的其他待处理项目之前执行。
  • @TimPote 从 java 8 开始,execute() 的当前实现也会处理这种情况。如果一个任务可以成功排队,那么我们仍然需要 * 仔细检查我们是否应该添加一个线程 * (因为现有的线程自上次检查后就死了)或者 * 池在进入此方法后关闭。因此,我们 * 重新检查状态,如果 * 停止,则在必要时回滚入队,如果没有,则启动新线程。 Darren,您是否也看到这种方法与 java 8 的任何问题?
【解决方案2】:

在这种情况下,这是我的代码 sn-p:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool 是已经初始化的 java.util.concurrent.ExecutorService 的本地实例。

【讨论】:

    【解决方案3】:

    我使用自定义的 RejectedExecutionHandler 解决了这个问题,它只是将调用线程阻塞了一会儿,然后再次尝试提交任务:

    public class BlockWhenQueueFull implements RejectedExecutionHandler {
    
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    
            // The pool is full. Wait, then try again.
            try {
                long waitMs = 250;
                Thread.sleep(waitMs);
            } catch (InterruptedException interruptedException) {}
    
            executor.execute(r);
        }
    }
    

    这个类可以像任何其他类一样在线程池执行器中用作 RejectedExecutionHandler。在这个例子中:

    executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())
    

    我看到的唯一缺点是调用线程被锁定的时间可能比严格要求的时间稍长(最多 250 毫秒)。对于许多短期运行的任务,可能会将等待时间减少到 10 毫秒左右。此外,由于此执行程序实际上是递归调用的,因此等待线程可用的时间过长(数小时)可能会导致堆栈溢出。

    不过,我个人喜欢这种方法。它结构紧凑、易于理解且运行良好。我错过了什么重要的东西吗?

    【讨论】:

      【解决方案4】:

      您可以使用semaphore 阻止线程进入池。

      ExecutorService service = new ThreadPoolExecutor(
          3, 
          3, 
          1, 
          TimeUnit.HOURS, 
          new ArrayBlockingQueue<>(6, false)
      );
      
      Semaphore lock = new Semaphore(6); // equal to queue capacity
      
      for (int i = 0; i < 100000; i++ ) {
          try {
              lock.acquire();
              service.submit(() -> {
                  try {
                    task.run();
                  } finally {
                    lock.release();
                  }
              });
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          }
      }
      

      一些陷阱

      • 仅将此模式与固定线程池一起使用。队列不太可能经常被填满,因此不会创建新线程。查看 ThreadPoolExecutor 上的 java 文档以了解更多详细信息:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html 有一种解决方法,但它超出了此答案的范围。
      • 队列大小应大于核心线程数。如果我们将队列大小设为 3,最终会发生什么:

        • T0:三个线程都在工作,队列为空,没有可用的许可。
        • T1:线程 1 完成,释放许可证。
        • T2:线程 1 轮询队列中的新工作,没有找到,然后等待
        • T3:主线程将工作提交到池中,线程 1 开始工作。

        上面的例子将主线程阻塞线程 1 转换为线程。它可能看起来像一个小周期,但现在将频率乘以天和月。突然之间,短时间加起来浪费了大量时间。

      【讨论】:

      • 线程 1 在时间 T2 发现队列为空时已被阻塞。我不确定我是否理解您关于主线程阻塞该线程的观点。
      • @asgs "线程 1 在时间 T2 发现队列为空时已被阻塞。"对,而且因为将工作放入队列是主线程的责任,所以可以推断出主线程正在阻塞线程 1。
      【解决方案5】:

      您需要做的是将您的 ThreadPoolExecutor 包装到 Executor 中,从而明确限制其中并发执行的操作数量:

       private static class BlockingExecutor implements Executor {
      
          final Semaphore semaphore;
          final Executor delegate;
      
          private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
              semaphore = new Semaphore(concurrentTasksLimit);
              this.delegate = delegate;
          }
      
          @Override
          public void execute(final Runnable command) {
              try {
                  semaphore.acquire();
              } catch (InterruptedException e) {
                  e.printStackTrace();
                  return;
              }
      
              final Runnable wrapped = () -> {
                  try {
                      command.run();
                  } finally {
                      semaphore.release();
                  }
              };
      
              delegate.execute(wrapped);
      
          }
      }
      

      您可以将 concurrentTasksLimit 调整为您的委托执行程序的 threadPoolSize + queueSize,它几乎可以解决您的问题

      【讨论】:

      • 漂亮流畅。谢谢!
      【解决方案6】:

      这就是我最终做的:

      int NUM_THREADS = 6;
      Semaphore lock = new Semaphore(NUM_THREADS);
      ExecutorService pool = Executors.newCachedThreadPool();
      
      for (int i = 0; i < 100000; i++) {
          try {
              lock.acquire();
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
          }
          pool.execute(() -> {
              try {
                  // Task logic
              } finally {
                  lock.release();
              }
          });
      }
      

      【讨论】:

        【解决方案7】:

        一个相当简单的选择是在调用offer(..) 时使用调用put(..) 的实现包装您的BlockingQueue

        public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {
        
        (..)
        
          public boolean offer(E o) {
                try {
                    delegate.put(o);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return false;
                }
                return true;
          }
        
        (.. implement all other methods simply by delegating ..)
        
        }
        

        这是可行的,因为默认情况下put(..) 等到队列中的容量已满时,see

            /**
             * Inserts the specified element into this queue, waiting if necessary
             * for space to become available.
             *
             * @param e the element to add
             * @throws InterruptedException if interrupted while waiting
             * @throws ClassCastException if the class of the specified element
             *         prevents it from being added to this queue
             * @throws NullPointerException if the specified element is null
             * @throws IllegalArgumentException if some property of the specified
             *         element prevents it from being added to this queue
             */
            void put(E e) throws InterruptedException;
        

        无需捕获RejectedExecutionException 或复杂的锁定。

        【讨论】:

          【解决方案8】:

          好的,旧线程,但这是我在搜索阻塞线程执行程序时发现的。当任务提交到任务队列时,我的代码尝试获取信号量。如果没有剩余信号量,这将阻塞。一旦任务完成,装饰器就会释放信号量。可怕的部分是可能会丢失信号量,但这可以通过定时作业来解决,例如定时清除信号量。

          所以这是我的解决方案:

          class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
              companion object {
                  lateinit var semaphore: Semaphore
              }
          
              init {
                  semaphore = Semaphore(concurrency)
                  val semaphoreTaskDecorator = SemaphoreTaskDecorator()
                  this.setTaskDecorator(semaphoreTaskDecorator)
              }
          
              override fun <T> submit(task: Callable<T>): Future<T> {
                  log.debug("submit")
                  semaphore.acquire()
                  return super.submit(task)
              }
          }
          
          private class SemaphoreTaskDecorator : TaskDecorator {
              override fun decorate(runnable: Runnable): Runnable {
                  log.debug("decorate")
                  return Runnable {
                      try {
                          runnable.run()
                      } finally {
                          log.debug("decorate done")
                          semaphore.release()
                      }
                  }
              }
          }
          

          【讨论】:

            猜你喜欢
            • 2014-03-12
            • 2014-10-16
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多