【问题标题】:Is it necessary to use AtomicInteger in a ThreadFactory?是否有必要在 ThreadFactory 中使用 AtomicInteger?
【发布时间】:2019-07-28 02:20:37
【问题描述】:

我认为有必要在 ThreadFactory 中使用AtomicInteger,但是当我试图向自己证明这一点时,我失败了。

    new ThreadFactory() {

        private int threadId = 0;   <---- AtomicInteger preferred

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("my-thread-" + (threadId++));   <--- dangerous code
            return t;
        }
    }

如果有多个请求出现,线程工厂将生成线程来处理它们,并且在生成过程中,可能会出现竞争条件潜入的间隙。

我尝试使用以下代码来证明我的理论,但对于 2_000 个核心线程根本没有发生。

@Slf4j
public class ThreadFactoryTest {

    private ConcurrentHashMap<String, Thread> threadIdThreadMap = new ConcurrentHashMap<>();
    private ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(2000, 2000, 30, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100000), new ThreadFactory() {

        private int threadId = 0;

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("my-thread-" + (threadId++));
            if (threadIdThreadMap.contains(t.getName())) {
                log.error("already existed");
                System.err.println(myExecutor);
                myExecutor.shutdownNow();
            } else threadIdThreadMap.put(t.getName(), t);
            return t;
        }
    }, new ThreadPoolExecutor.AbortPolicy());

    @Test
    public void testThreadFactory() throws Exception {
        for (int i = 0; i < 100; ++i) {
            new Thread(() -> runOneHundredJobs()).start();
        }
        Thread.sleep(1000000);
        myExecutor.shutdown();
        myExecutor.awaitTermination(100, TimeUnit.MINUTES);
    }

    private void runOneHundredJobs() {
        log.info("{} starting to submit tasks", Thread.currentThread().getName());
        for (int i = 0; i < 100; ++i) {
            myExecutor.execute(() -> {
                while (100 < System.currentTimeMillis()) {
                    try {
                        Thread.sleep(1000);
                        if (Math.random() > 0.99) break;
                        System.out.println(Thread.currentThread().getName());
                        System.out.println(myExecutor);
                    } catch (Exception e) {

                    }
                }
            } );
        }
    }
}

看起来像一个愚蠢的问题,因为我一直都知道“很难为多线程竞争条件创建间隙”。

任何帮助/线索将不胜感激;)

更新

非常感谢@StephenC 和@Slaw 在此过程中提供的帮助。很抱歉我误解了那里的一些观点;(

所以newThread 应该以线程安全 的方式实现,然后在我的情况下,AtomicInteger 是必需的。我想引用 StephenC 的话:

未能证明竞争条件并不意味着它不存在。

【问题讨论】:

  • 启动一个线程是一个漫长的过程,因此很难得到精确的对齐。如何创建线程并让它们都从等待同一个锁开始,或者都等到下午 3 点等。这样,它们都将从同一个固定点开始运行,因此更有可能获得那种竞争条件。

标签: java multithreading java-8 threadpool atomicinteger


【解决方案1】:

是否需要在 ThreadFactory 中使用 AtomicInteger?

这取决于工厂对象的使用方式。

  • 如果您为ThreadPoolExecutor 的每个实例提供不同的工厂对象,那么工厂的(实际)并发要求将取决于执行程序如何使用它。在 javadocs 中没有语句的情况下,您需要检查源代码。我还没有检查,但我怀疑线程池的扩展(包括对newThread 的调用)发生在互斥体内部。如果我的怀疑是正确的,那么这个用例不需要工厂对象是线程安全的。

    更新 - 我现在已经检查过了,我的怀疑是不正确(对于 Java 8 和 12)。 newThread 调用是在创建新的 Worker 对象时进行的,而在持有互斥体时不会这样做。因此,您的 newThread 方法在这种情况下也需要是线程安全的。

  • 如果工厂对象与其他事物(例如另一个执行程序)共享,那么您是对的:您的 newThread 方法需要是线程安全的。


我没有查看您的代码来尝试显示竞争条件,但在我看来,这不是解决此问题的最佳方法。代码检查和推理是一种更好的方法。未能证明竞争条件并不意味着它不存在。

【讨论】:

  • 谢谢你,斯蒂芬,我喜欢你的话未能证明竞争条件并不意味着它不存在。.
  • 但我不明白你的想法:>>> 但我怀疑线程池的扩展(包括对 newThread 的调用)发生在互斥锁内。如果我的怀疑是正确的,那么这个用例不需要工厂对象是线程安全的。thread-safe?
  • 查看 Java 12 实现,在创建 Worker(其构造函数调用 ThreadFactory#newThread)时,ThreadPoolExecutor 不会在互斥锁上同步。它适用于其他事情,但不是那样。此外,我自己的测试(类似于 OP)失败:预期计数为 2000,但实际计数(非原子/同步时)最终为 1998 或 1999。而预期计数和实际计数 在使用时相等AtomicInteger.
  • @Hearen 如果我正确理解您的困惑,Stephen 的意思是 您的 ThreadFactory 实现 必须是线程安全的,而不是 ThreadPoolExecutor 在线程中调用 newThread -安全的方式。
  • 是的。我说的是方法,而不是对方法的调用。如果您阅读/理解我之前在答案中所说的话,我错误地假设该方法不需要是线程安全的。
【解决方案2】:

我正在简化测试,以使预期的结果从水下出现。

通过下面的测试,预期的线程大小为 1000 尖锐,而使用 int 会经常给出 less 大小(在我的 macOS 中为 994、996、999),并非总是如此时间>。

public class ThreadFactoryTest {
    private ConcurrentHashMap<String, Thread> threadIdThreadMap = new ConcurrentHashMap<>();
    private ThreadPoolExecutor myExecutor = new ThreadPoolExecutor(2000, 2000, 30, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100000), new ThreadFactory() {

        private int threadId = 0;
        private AtomicInteger atomicThreadId = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            t.setName("my-thread-" + (threadId++));
            // uncomment this line, the thread size will be less than 1000
            t.setName("my-thread-" + (atomicThreadId.getAndIncrement()));
            threadIdThreadMap.put(t.getName(), t);
            return t;
        }
    }, new ThreadPoolExecutor.AbortPolicy());



    @Test
    public void testThreadFactory() throws Exception {
        for (int i = 0; i < 50; ++i) {
            new Thread(() -> runOneHundredJobs()).start();
        }
        Thread.sleep(1000000);
        myExecutor.shutdown();
        myExecutor.awaitTermination(100, TimeUnit.MINUTES);
    }

    private void runOneHundredJobs() {
        for (int i = 0; i < 20; ++i) {
            myExecutor.execute(() -> {
                while (100 < System.currentTimeMillis()) {
                    try {
                        Thread.sleep(1000);
                        log.warn("count: {}", threadIdThreadMap.size());
                    } catch (Exception e) {

                    }
                }
            });
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-04-25
    • 2022-01-24
    • 1970-01-01
    • 2023-01-16
    • 1970-01-01
    相关资源
    最近更新 更多