【问题标题】:What's the logic behind thread selection from thread pools in the Java concurrent API?从 Java 并发 API 中的线程池中选择线程背后的逻辑是什么?
【发布时间】:2019-03-11 03:03:41
【问题描述】:

Java concurrent API 提供了许多有用的库和功能,可以更轻松地处理异步和多线程控制流。其中一项功能是Thread Pools

今天早些时候,我正在试验并发 API,并注意到一些关于从线程池中选择线程的奇怪模式。他们让我想知道线程选择背后的逻辑是什么。下面是一些示例代码,也许可以为您提供我正在谈论的示例。当程序记录每个tick 时,您会注意到线程名称中的模式(在[ ] 内)。虽然,该模式可能不会出现在 64 位 Windows 10 机器上的 Oracle JDK 1.8.0_161 以外的 JDK 版本上。

无论如何,我的问题与任何巧合的模式关系不大,而是关于从线程池中选择线程的过程。这个模式让我相信它不是完全随机的,那么这个选择背后的逻辑是什么?谢谢。 :)

public static void main(String[] args)
{
    // create a ScheduledExecutorService with a Thread Pool of 7 threads
    ScheduledExecutorService ses = Executors.newScheduledThreadPool(7);
    log("go");
    // starts a timer of 30 seconds, shutting down ses afterwards
    ses.schedule(() -> call(ses), 30, TimeUnit.SECONDS);
    // starts the ticker
    ses.schedule(() -> tick(ses, 1), 1, TimeUnit.SECONDS);
}

// ticks once per second, logging the current tick counter. (i.e, counts
// by 1 each second) ticking ends when ses is shutdown.
public static void tick(ScheduledExecutorService ses, int count)
{
    if (!ses.isShutdown())
    {
        log("tick %d", count);
        ses.schedule(() -> tick(ses, count + 1), 1, TimeUnit.SECONDS);
    }
}

// called when it's time to shutdown ses.
public static void call(ScheduledExecutorService ses)
{
    log("done");
    ses.shutdown();
}

// formats and logs the given message alongside a timestamp and the name
// of the executing thread
public static void log(String s, Object...args)
{
    SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS");
    String time = sdf.format(new Date(System.currentTimeMillis()));
    String thread = Thread.currentThread().getName();
    String message = String.format(s, args);
    String log = String.format("%s [%s] - %s", time, thread, message);
    System.out.println(log);
}

【问题讨论】:

标签: java multithreading concurrency threadpool java.util.concurrent


【解决方案1】:

您创建的ScheduledThreadPool 内部有一个DelayQueue。当您提交任务时,它总是会创建一个新的工作线程,该线程将继续从DelayQueue 获取任务。

提交任务时的代码,新线程不断接任务

//getTask() method calls DelayQueue#take()
while (task != null || (task = getTask()) != null) {
       ......
       task.run();
       ......
}

take()方法如果getDelay方法返回零则返回一个任务,否则工作线程将等待倒计时结束。时间到时唤醒工作线程获取任务。

DelayQueue#take()的代码

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element with an expired delay is available on this queue.
 *
 * @return the head of this queue
 * @throws InterruptedException {@inheritDoc}
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

当你提交 7 个任务时,有 7 个工作线程等待被唤醒。 所以线程选择依赖于唤醒策略,一般按照等待队列的顺序唤醒。

【讨论】:

    猜你喜欢
    • 2017-07-18
    • 2014-12-15
    • 2015-02-13
    • 2011-03-18
    • 1970-01-01
    • 2012-02-19
    • 1970-01-01
    • 2010-11-14
    • 1970-01-01
    相关资源
    最近更新 更多