【问题标题】:Is ExecutorService (specifically ThreadPoolExecutor) thread safe?ExecutorService(特别是 ThreadPoolExecutor)线程安全吗?
【发布时间】:2010-12-14 17:09:36
【问题描述】:

ExecutorService 是否保证线程安全?

我将来自不同线程的作业提交到同一个 ThreadPoolExecutor,我是否必须在交互/提交任务之前同步对执行器的访问?

【问题讨论】:

    标签: java multithreading executorservice


    【解决方案1】:

    您的问题相当开放:@​​987654321@ 接口所做的只是保证某处的某个线程将处理提交的RunnableCallable 实例。

    如果提交的Runnable / Callable 引用了可从其他Runnable / Callables 实例访问的共享数据结构(可能由不同线程同时处理),则由您负责 以确保跨此数据结构的线程安全。

    要回答您问题的第二部分,是的,您可以在提交任何任务之前访问 ThreadPoolExecutor;例如

    BlockingQueue<Runnable> workQ = new LinkedBlockingQueue<Runnable>();
    ExecutorService execService = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.SECONDS, workQ);
    ...
    execService.submit(new Callable(...));
    

    编辑

    根据 Brian 的评论,以防我误解了您的问题:从多个生产者线程向 ExecutorService 提交任务通常是线程安全的(尽管在接口的 API 中没有明确提及)告诉)。任何不提供线程安全的实现在多线程环境中都是无用的(因为多个生产者/多个消费者是一个相当常见的范例),这就是ExecutorService(以及java.util.concurrent 的其余部分)的具体内容专为。

    【讨论】:

    • 他问的不是 submission 是线程安全的吗?即他可以从不同的线程提交
    • 是的,我问从多个线程向同一个 ThreadPoolExecutor 实例提交任务是否安全。更新了问题,因为一个重要的“同步”词消失了:|
    • “任何不提供线程安全的实现在多线程环境中都是无用的”:假设的 ExecutorService 提供非线程安全的实现并非完全不可信,因为生产者是一种很常见的模式。 (但对于一般用途的 ThreadPoolExecutor,该评论当然成立)
    【解决方案2】:

    确实,所讨论的 JDK 类似乎并没有明确保证线程安全的任务提交。但是,在实践中,库中的所有 ExecutorService 实现确实是线程安全的。我认为依靠这一点是合理的。由于实现这些功能的所有代码都放在公共域中,因此任何人都绝对没有动机以不同的方式完全重写它。

    【讨论】:

    • “放在公共领域”真的吗?我认为它使用 GPL。
    • JDK 有,但 Doug Lea 没有。
    • 对线程安全的任务提交做出了充分的保证:参见 interface ExecutorService 的 javadoc 底部,ThreadPoolExecutor 也必须遵守。 (我最近更新的答案中有更多详细信息。)
    • 如果你有一个 executor 有一个线程,并且在那个线程中,你想向那个 executor 提交工作,等待它完成,就会出现提交工作不会的死锁问题永远运行。使用同步块,当您进入等待模式时,锁定将被删除。考虑某人正在等待您的任务完成的情况,并且在该任务中,您可能会根据某些标准安排更多工作要做。然后,您将不得不等待它们完成,以便在实际完成工作时向原始调用者发出信号。
    • 这适用于任何规模的执行者。
    【解决方案3】:

    对于ThreadPoolExecutor,答案很简单ExecutorService 确实 要求或以其他方式保证所有实现都是线程安全的,并且它不能因为它是一个接口。这些类型的契约超出了 Java 接口的范围。但是,ThreadPoolExecutor 两者都被明确记录为线程安全的。此外,ThreadPoolExecutor 使用java.util.concurrent.BlockingQueue 管理其作业队列,java.util.concurrent.BlockingQueue 是一个要求所有实现都是线程安全的接口。 BlockingQueue 的任何 java.util.concurrent.* 实现都可以安全地假定为线程安全的。任何非标准的实现都可能不会,尽管如果有人要提供一个不是线程安全的BlockingQueue 实现队列,那将是完全愚蠢的。

    所以你的标题问题的答案显然是。您问题后续正文的答案是可能,因为两者之间存在一些差异。

    【讨论】:

    • 接口可以并且确实要求线程安全的实现。线程安全是一个记录在案的契约,就像任何其他类型的行为(例如List.hashCode())一样。 javadocs 说“BlockingQueue 实现是线程安全的”(因此非线程安全的 BlockingQueue 不仅愚蠢而且有问题),但没有关于 ThreadPoolExecutor 或其实现的任何接口的此类文档。
    • 您能否参考清楚说明ThreadPoolExecutor 是线程安全的文档?
    【解决方案4】:

    (与其他答案相反)记录了线程安全合同:查看interface javadocs(与方法的javadoc相反)。例如,在ExecutorService javadoc 的底部,您会发现:

    内存一致性效果:线程中的动作之前 向 ExecutorService 提交 Runnable 或 Callable 任务 happen-before 该任务采取的任何行动,这反过来 happen-before 通过 Future.get() 检索结果。

    这足以回答这个问题:

    “我必须在交互/提交任务之前同步对执行器的访问吗?”

    不,你没有。无需外部同步即可构造和提交作业到任何(正确实现的)ExecutorService。这是主要的设计目标之一。

    ExecutorService 是一个并发 实用程序,也就是说,它旨在最大程度地运行而不需要同步,以提高性能。 (同步会导致线程争用,这会降低多线程效率 - 特别是在扩展到大量线程时。)

    无法保证任务将在未来的什么时间执行或完成(有些甚至可能在提交它们的同一线程上立即执行),但是保证工作线程已经看到提交线程执行的所有效果到提交点。因此(运行的线程)您的任务也可以安全地读取为其使用而创建的任何数据,而无需同步、线程安全类或任何其他形式的“安全发布”。提交任务的行为本身就足以将输入数据“安全发布”到任务。您只需要确保在任务运行时不会以任何方式修改输入数据。

    同样,当您通过Future.get() 取回任务的结果时,检索线程将保证看到执行器的工作线程产生的所有效果(在返回的结果中,加上任何副作用都会改变工作线程-thread 可能已经做了)。

    这个契约也暗示了任务本身提交更多的任务是可以的。

    “ExecutorService 是否保证线程安全?”

    现在这部分问题更为笼统。例如,找不到任何关于方法 shutdownAndAwaitTermination 的线程安全合同声明 - 尽管我注意到 Javadoc 中的代码示例不使用同步。 (虽然可能有一个隐藏的假设,即关闭是由创建 Executor 的同一线程发起的,而不是例如工作线程?)

    顺便说一句,我推荐《Java Concurrency In Practice》一书,作为并发编程领域的良好基础。

    【讨论】:

    • 这些不是完整的线程安全保证;他们只在特定情况下建立可见性顺序。例如,没有明确记录的保证从多个线程调用 execute() 是安全的(在执行器上运行的任务的上下文之外)。
    • @Miles 经过几年的更多经验:-) ...我不同意。之前发生的关系是 Java 5 中引入的(开创性的)Java 内存模型中的一个基本概念,它反过来形成了定义并发(相对于同步)线程安全合约的基本构建块。 (我再次支持我原来的答案,尽管希望现在通过一些编辑更清楚。)
    【解决方案5】:

    Luke Usherwood 声称的答案相反,文档并未暗示ExecutorService 实现保证是线程安全的。至于ThreadPoolExecutor的问题具体看其他答案。

    是的,指定了 happens-before 关系,但这并不意味着方法本身的线程安全性,正如Miles 所评论的那样。在Luke Usherwood的回答中指出前者足以证明后者,但没有提出实际论证。

    “线程安全”可以表示多种含义,但这里有一个简单的反例Executor(不是ExecutorService,但没有区别),它微不足道地满足了所需的发生之前 em> 关系,但不是线程安全的,因为对 count 字段的访问不同步。

    class CountingDirectExecutor implements Executor {
    
        private int count = 0;
    
        public int getExecutedTaskCount() {
            return count;
        }
    
        public void execute(Runnable command) {
            command.run();
        }
    }
    

    免责声明:我不是专家,我发现这个问题是因为我自己在寻找答案。

    【讨论】:

    • 你说的都是真的,但问题特别问“我是否必须同步对执行程序的访问” - 所以我在这种情况下读到“线程安全”只是在谈论线程 -执行器(内部的状态/数据)的安全性,以及调用其方法的操作。
    • 如何使提交的任务本身具有“线程安全的副作用”是一个更大的话题! (如果他们不这样做,这会更容易很多。就像,如果一些不可变的计算结果可以被传回。当他们确实触及可变共享状态时,那么肯定:你需要注意定义并了解线程边界并考虑线程安全、死锁、活锁等)
    【解决方案6】:

    对于 ThreadPoolExecutor,它的提交是线程安全的。 jdk8中可以看到源码。添加新任务时,它使用一个 mainLock 来确保线程安全。

    private boolean addWorker(Runnable firstTask, boolean core) {
                retry:
                for (;;) {
                    int c = ctl.get();
                    int rs = runStateOf(c);
    
                    // Check if queue empty only if necessary.
                    if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                           firstTask == null &&
                           ! workQueue.isEmpty()))
                        return false;
    
                    for (;;) {
                        int wc = workerCountOf(c);
                        if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                            return false;
                        if (compareAndIncrementWorkerCount(c))
                            break retry;
                        c = ctl.get();  // Re-read ctl
                        if (runStateOf(c) != rs)
                            continue retry;
                        // else CAS failed due to workerCount change; retry inner loop
                    }
                }
    
                boolean workerStarted = false;
                boolean workerAdded = false;
                Worker w = null;
                try {
                    w = new Worker(firstTask);
                    final Thread t = w.thread;
                    if (t != null) {
                        final ReentrantLock mainLock = this.mainLock;
                        mainLock.lock();
                        try {
                            // Recheck while holding lock.
                            // Back out on ThreadFactory failure or if
                            // shut down before lock acquired.
                            int rs = runStateOf(ctl.get());
    
                            if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                                if (t.isAlive()) // precheck that t is startable
                                    throw new IllegalThreadStateException();
                                workers.add(w);
                                int s = workers.size();
                                if (s > largestPoolSize)
                                    largestPoolSize = s;
                                workerAdded = true;
                            }
                        } finally {
                            mainLock.unlock();
                        }
                        if (workerAdded) {
                            t.start();
                            workerStarted = true;
                        }
                    }
                } finally {
                    if (! workerStarted)
                        addWorkerFailed(w);
                }
                return workerStarted;
            }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-04-07
      • 2021-10-12
      • 2015-04-18
      相关资源
      最近更新 更多