【问题标题】:ThreadPoolExecutor execute tasks in queue first after creating new threads when queue capacity is reached instead of executing the new incoming jobsThreadPoolExecutor 在达到队列容量时创建新线程后首先执行队列中的任务,而不是执行新的传入作业
【发布时间】:2021-07-21 07:52:06
【问题描述】:

我编写了一个简单的演示来模拟我面临的问题。

public abstract class ThreadPoolTest {

    private static final int CORE_POOL_SIZE = 5;
    private static final int QUEUE_CAPACITY = 15;
    private static final int MAX_POOL_SIZE = 50;

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
        ThreadPoolExecutor pool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1l, TimeUnit.SECONDS, workQueue);
        execueInBatch(pool, 25);
        pool.shutdown();
    }

    @SuppressWarnings("boxing")
    private static void execueInBatch(ThreadPoolExecutor pool, int num) throws InterruptedException {
        for (int i = 1; i <= num; i++) {
            final Integer it = Integer.valueOf(i);
            try {
                System.out.println("About to start " + i);
                CompletableFuture.runAsync(() -> {
                    System.out.println("Started- " + it);
                    try {
                        Thread.sleep(5000l);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Completed- " + it);
                }, pool);
            } catch (Exception e) {
                System.err.println(e.getMessage() + " for-" + i);
            }
        }
    }
}

回复如下:
即将开始 1
即将开始 2
即将开始 3
开始- 1
即将开始 4
开始- 2
即将开始 5
开始- 3
即将开始 6
即将开始 7
即将开始 8
即将开始 9
开始- 4
即将开始 10
即将开始 11
即将开始 12
即将开始 13
即将开始 14
即将开始 15
即将开始 16
即将开始 17
开始 - 5
即将开始 18
即将开始 19
即将开始 20
即将开始 21
即将开始 22
即将开始 23
开始 - 21
开始 - 22
开始 - 23
即将开始 24
即将开始 25
开始 - 24
开始 - 25
已完成- 1
开始 - 6
已完成- 3
已完成- 2
已完成- 4
开始 - 8
开始 - 7
开始 - 9
已完成- 5
开始 - 10
已完成- 23
已完成- 21
开始 - 11
已完成- 22
开始 - 12
开始 - 13
已完成 - 25
已完成- 24
开始 - 14
开始 - 15
已完成- 6
开始 - 16
已完成- 8
已完成- 9
开始 - 17
已完成- 7
开始 - 18
开始 - 19
已完成- 10
开始 - 20
已完成 - 13
已完成- 11
已完成 - 12
已完成 - 15
已完成 - 14
已完成 - 16
已完成 - 17
已完成 - 18
已完成 - 19
已完成 - 20

我需要在启动任务 21 之前启动任务 6。 有没有办法实现?

【问题讨论】:

  • 当 N 个线程大约在同一时间拿起一个排队的项目时,除非 N=1 线程或者您在每个步骤开始时添加逻辑以等待特定计数器,否则您无法判断哪个将首先开始事件发生(所以除非事件 1 开始,否则 2 不会继续)。

标签: java queue threadpoolexecutor


【解决方案1】:

当池中有许多线程时,您无法保证哪个线程会首先实际进入可运行任务,除非池大小为 1 - 并且没有同时运行 - 或者如果您在每个任务中添加额外的屏障/检查以保证您在并行执行中的特定顺序启动顺序。

这个例子展示了对每个整数使用CountDownLatch。每个任务在倒计时它自己的锁存器之前检查前一个整数的CountDownLatch 是否已被触发,然后继续执行该任务项。

private static void execueInBatch(ThreadPoolExecutor pool, int num) throws InterruptedException {

    CountDownLatch prev = new CountDownLatch(0);

    for (int i = 1; i <= num; i++) {
        final Integer it = Integer.valueOf(i);
        final CountDownLatch thislatch = new CountDownLatch(1);
        final CountDownLatch prevlatch = prev;
        try {
            System.out.println("About to start " + i+" in "+Thread.currentThread());
            CompletableFuture.runAsync(() -> {
                try {
                    // This check is not needed, it just shows how often threads are running out of the add sequence:
                    if(!prevlatch.await(1, TimeUnit.NANOSECONDS))
                        System.out.println("Note: out of sequence run - " + it+ " in "+Thread.currentThread());

                    // Await event start of previous value of "it" to guarantee sequential start
                    prevlatch.await();

                    System.out.println("Started- " + it+ " in "+Thread.currentThread());

                    // Count down as this one is starting
                    thislatch.countDown();

                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Completed- " + it+ " in "+Thread.currentThread());
            }, pool);
        } catch (Exception e) {
            System.err.println(e.getMessage() + " for-" + i+ " in "+Thread.currentThread());
        }
        prev = thislatch;
    }
}

这不是一个理想的解决方案,因为只要首先处理无序值(例如您观察到的第 21 项),它就会阻塞某些池线程,并且那些后面的值线程将等待其他线程上的较早操作线程结束,然后其他池线程拾取其间的项目。

【讨论】:

  • 请注意,即使使用上面的锁存器来确保每个任务的进入是按顺序进行的,也不能保证每个任务的下一行将按照当有多个线程时定义的顺序运行在游泳池里。
猜你喜欢
  • 2019-07-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-08-16
  • 2019-08-02
  • 1970-01-01
  • 1970-01-01
  • 2019-02-14
相关资源
最近更新 更多