【问题标题】:Fixed Thread Pool threads blocking, when enough tasks submitted修复了线程池线程阻塞,当提交足够多的任务时
【发布时间】:2016-03-15 17:42:53
【问题描述】:

我有一个进程需要并行计算许多小任务,然后按照任务的自然顺序处理结果。为此,我有以下设置:

一个简单的 ExecutorService 和一个阻塞队列,当 Callable 提交给执行器时,我将使用它来保持返回的 Future 对象:

ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);

一些调试代码,用于统计提交的数量和处理的任务数量并定期写出来(注意processed在任务代码本身的末尾递增):

AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);

Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
      @Override
      public void run() {
        l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
      }             
}, 60 * 1000, 60 * 1000);

一个线程从队列(实际上是一个生成器)中获取任务并将它们提交给执行器,将生成的 Future 放入 futures 队列中(这是我确保我不会提交太多任务的方式内存不足):

Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.poll()) != null) {
            futures.put(exec.submit(task));
            submitted.incrementAndGet();
        }
    } catch (Exception e) {l .error("Unexpected Exception", e);}
}, "SubmitTasks");
submitThread.start();

当前线程然后take-s 完成了futures 队列外的任务并处理结果:

while (!futures.isEmpty() || submitThread.isAlive()) {
    MyTask task = futures.take().get();
    //process result
}

当我在具有 8 个内核的服务器上运行此程序时(请注意,代码当前使用 15 个线程),CPU 利用率峰值仅约为 60%。我看到我的调试输出是这样的:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950

线程转储显示许多线程池线程像这样阻塞:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

我对调试输出的解释是,在任何给定时间点,我至少有数百个任务已提交给执行器服务,但尚未处理(我也可以在堆栈跟踪中确认SubmitTasks 线程在LinkedBlockingQueue.put 上被阻塞)。然而,堆栈跟踪(和服务器利用率统计信息)向我显示,Executor Service 在 LinkedBlockingQueue.take 上被阻止(我假设内部任务队列为空)。

我读错了什么?

【问题讨论】:

    标签: java multithreading threadpool


    【解决方案1】:

    2.5 年后,我看到这个问题已经收到了一些意见,并想我会提供一个跟进。

    经过多次更改和测试后,我最终将任务分组为每组 10000 个(也就是说,每个 Future 负责一组 10000 个 MyTask 任务,而不仅仅是 1 个)。这样ExecutorService 每秒执行大约 10-20 个任务(而不是我“要求”它执行的相当高的 100000-200000。这种方法显着提高了速度并导致 CPU 利用率达到 100%。

    事后看来,每秒执行超过 10 万个任务似乎“不合理”。我的阅读是在并发管理/锁定开销和上下文切换(一个猜想)上花费了太多时间。

    【讨论】:

      【解决方案2】:

      涉及BlockingQueues 的线程总是很棘手。只需查看您的代码,而无需按照您所做的规模运行。我有一些建议。像 Jessica Kerr 这样的业内许多专家的建议是,你永远不应该永远阻止。您可以做的是在 LinkedBlockingQueue 中使用带有超时的方法。

      Thread submitThread = new Thread(() ->
      {
          MyTask task;
          try {
              while ((task = taskQueue.peek()) != null) {
                  boolean success = futures.offer(exec.submit(task), 1000, TimeUnit.MILLISECONDS);
                  if(success) {
                      submitted.incrementAndGet();
                      taskQueue.remove(task);
                  }
              }
          } catch (Exception e) {l .error("Unexpected Exception", e);}
      }, "SubmitTasks");
      submitThread.start();
      

      这里也是。

      while (!futures.isEmpty() || submitThread.isAlive()) {
          Future<MyTask> f = futures.poll(1000, TimeUnit.MILLISECONDS);
          if(f != null) {
              MyTask task = f.get();
          }
          //process result
      }
      

      观看 Jessica Kerr 在Concurrency tools in JVM 上的视频

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-09-26
        • 2016-06-24
        • 1970-01-01
        • 1970-01-01
        • 2021-08-08
        • 1970-01-01
        • 2019-04-06
        • 1970-01-01
        相关资源
        最近更新 更多