【问题标题】:ExecutorService's surprising performance break-even point --- rules of thumb?ExecutorService 惊人的性能盈亏平衡点——经验法则?
【发布时间】:2010-12-11 12:05:57
【问题描述】:

我正在尝试弄清楚如何正确使用 Java 的 Executors。我意识到向ExecutorService 提交任务有其自身的开销。但是,看到它这么高,我很惊讶。

我的程序需要以尽可能低的延迟处理大量数据(股市数据)。大多数计算都是相当简单的算术运算。

我尝试测试一些非常简单的东西:“Math.random() * Math.random()

最简单的测试在一个简单的循环中运行此计算。第二个测试在匿名 Runnable 中执行相同的计算(这应该衡量创建新对象的成本)。第三个测试将Runnable 传递给ExecutorService(这衡量了引入执行者的成本)。

我在我的小笔记本电脑(2 cpu,1.5 gig ram)上运行了测试:

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422

(大约有四次运行,前两个数字最终相等)

请注意,执行程序比在单个线程上执行花费的时间要多得多。线程池大小在 1 到 8 之间的数字大致相同。

问题:我是否遗漏了一些明显的东西或者这些结果是预期的?这些结果告诉我,我传递给执行程序的任何任务都必须进行一些重要的计算。如果我正在处理数百万条消息,并且我需要对每条消息执行非常简单(且成本低廉)的转换,我仍然可能无法使用执行器......尝试将计算分布在多个 CPU 上可能最终会比仅仅花费更多在一个线程中执行它们。设计决策变得比我最初想象的要复杂得多。有什么想法吗?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}

【问题讨论】:

  • 哇,预览格式化的代码比最终结果好得多。我该如何解决这个问题?
  • 我刚刚重新格式化了,看起来更好吗?
  • 感谢 ZZ Coder,代码现在看起来应该是这样的
  • 是的,我没有运行任何这些代码示例,但我强烈怀疑上面运行的 ExecutorService 中几乎所有时间都来自 ExecutorService 的创建,甚至可能在为其工作产生一个新线程。
  • 不,服务和线程的创建是微不足道的。时间是由于锁定了 Math.random。

标签: java performance executorservice


【解决方案1】:

由于以下原因,这不是对线程池的公平测试,

  1. 您根本没有利用池化,因为您只有 1 个线程。
  2. 作业太简单了,无法证明池开销是合理的。使用 FPP 在 CPU 上进行乘法只需要几个周期。

考虑到线程池除了创建对象和运行作业之外还必须执行的额外步骤,

  1. 将作业放入队列中
  2. 从队列中删除作业
  3. 从池中获取线程并执行作业
  4. 将线程返回到池中

当你有一个真正的工作和多个线程时,线程池的好处将是显而易见的。

【讨论】:

  • 我第二个ZZ Coder;根据我的经验,当你的线程池更大时,好处会变得更加明显。
  • 执行者不必“获取”和“返回”线程。它创建一个内部工作线程来轮询()任务队列。此外,鉴于任务的低时间复杂度,只使用一个线程可能是一个优势,否则,BlockingQueue 中的锁可能会被争用并导致将工作线程移入和移出 Runnable 状态的问题.实际成本?去内核创建一个线程并在等待线程终止时调用阻塞操作。 10万不是很多。但吸取的教训是,性能调优需要测试。
  • 我确实尝试了 1 到 8 之间的线程池大小,它们都返回了大致相同的数字。我专注于 1 的池大小,因为我想测量执行器框架的开销。您的评论确实强化了我需要进一步研究框架的内部结构。
【解决方案2】:

首先,微基准测试存在一些问题。你做热身,这很好。但是,最好多次运行测试,这应该可以感觉到它是否真的热身以及结果的差异。在单独运行中对每个算法进行测试也往往会更好,否则当算法发生变化时,您可能会导致去优化。

任务非常小,虽然我不完全确定有多小。所以快多少倍是毫无意义的。在多线程情况下,它将触及相同的易失性位置,因此线程可能会导致非常糟糕的性能(每个线程使用Random 实例)。此外,47 毫秒的运行时间有点短。

当然,为了一个微小的操作转到另一个线程不会很快。如果可能,将任务分成更大的尺寸。 JDK7 看起来好像会有一个 fork-join 框架,它试图通过优先在同一线程上按顺序执行任务来支持分治算法中的精细任务,而更大的任务则由空闲线程拉出。

【讨论】:

  • 多次运行测试的好处。实际上我确实运行了很多次,我只是粘贴了一个结果。我明白你关于改进基准的观点。
【解决方案3】:
  1. 使用执行程序是关于利用 CPU 和/或 CPU 内核,因此,如果您创建的线程池最多可以利用 CPU 数量,那么您必须拥有与 CPU/内核一样多的线程。
  2. 你说得对,创建新对象的成本太高了。所以减少费用的一种方法是使用批次。如果您知道要进行的计算的种类和数量,您就可以创建批次。因此,请考虑在一项已执行任务中完成的数千次计算。您为每个线程创建批次。计算完成后 (java.util.concurrent.Future),您将创建下一批。甚至新批次的创建也可以并行完成(4 个 CPU -> 3 个用于计算的线程,1 个用于批量供应的线程)。最终,您可能会获得更高的吞吐量,但对内存的需求(批处理、配置)也更高。

编辑:我更改了您的示例,让它在我的小型双核 x200 笔记本电脑上运行。

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

正如您在源代码中看到的,我也将批量配置和执行程序生命周期排除在测量之外。这比其他两种方法更公平。

自己看结果...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}

【讨论】:

  • 有趣的解决方案。给了我一些关于如何改变我对执行者的使用的想法。
  • 嗨,如果我在 MacOsx 双核上运行此示例,我得到:simpleComputation: 268 computeWithObjCreation: 155 compute2: 0,因为未检索到 computeWithObjCreationAndExecutors 的结果?如果我在我们采取停止时间之前移动了 es.shutdown() 和 es.awaitTermination,那么结果:配置:要执行 2 个批次 simpleComputation:261 计算与ObjCreation:92 计算与ObjCreationAndExecutors:126 其中计算与ObjCreationAndExecutors 的性能始终比计算与ObjCreation 差。为什么会这样?
  • ...批次的执行可能会执行得更少。操作系统可能会在批处理执行具有更高优先级的任务期间阻止一个或多个内核的执行。上面的示例仅应说明并行执行和批量计算更大数据集的概念。即使是简单的计算问题也不会产生足够的基准。一个真正的基准测试需要更多的努力和一个干净的环境,而不会干扰您的系统任务(例如检查即时通讯或邮件更新、更新您的窗口、更新您的时钟,......)
  • Math.random 受同步影响,因此多线程测试作为与同步执行的性能比较并不真正有效。
  • @adrianos 是对的。不要使用 Math.random()。相反,首先新建一个 Random(),然后使用 nextDouble() 生成随机双打。这将避免同步。
【解决方案4】:

我认为这根本不现实,因为您每次进行方法调用时都会创建一个新的执行程序服务。除非您有看起来不切实际的非常奇怪的要求 - 通常您会在应用启动时创建服务,然后向其提交作业。

如果您再次尝试基准测试但将服务初始化为字段,则在计时循环之外进行一次;然后您会看到将 Runnables 提交到服务与自己运行它们的实际开销。

但我认为您还没有完全理解这一点 - 执行器并不是为了提高效率而存在的,它们的存在是为了更简单地协调和将工作移交给线程池。它们总是比自己调用Runnable.run() 效率低(因为在一天结束时,执行器服务仍然需要这样做,在事先做了一些额外的内务处理之后)。当您从需要异步处理的多个线程中使用它们时,它们才真正发挥作用。

还要考虑到您正在查看基本固定成本(无论您的任务需要 1 毫秒还是 1 小时来运行,执行器开销是相同的)与非常小的可变量(您的微不足道的可运行)相比的相对时间差异。如果 executor 服务需要 5ms 额外的时间来运行一个 1ms 的任务,这不是一个非常有利的数字。如果运行一个 5 秒的任务(例如一个重要的 SQL 查询)需要 5 毫秒的额外时间,那完全可以忽略不计,完全值得。

所以在某种程度上这取决于您的情况 - 如果您有一个非常时间关键的部分,运行许多不需要并行或异步执行的小任务,那么您将无法从 Executor 获得任何信息.如果您正在并行处理较重的任务并希望异步响应(例如 web 应用程序),那么 Executors 很棒。

它们是否是您的最佳选择取决于您的情况,但实际上您需要使用具有代表性的真实数据来尝试测试。我认为从你所做的测试中得出任何结论是不合适的,除非你的任务真的那么微不足道(而且你不想重用 executor 实例......)。

【讨论】:

  • 我在方法内初始化执行器,但不在循环内。我使用方法只是为了将测试分开。我知道执行者有他们的开销,我很惊讶它如此之高。不幸的是(或幸运的是),我的大部分计算实际上都是微不足道的(简单的算术),除了它们是在很多消息上完成的。考虑一个处理大量消息的消息传递系统,但每条消息的转换并不过分昂贵。我从中得到的是,我需要让我的程序以与我最初想的不同的粒度并发。
【解决方案5】:

Fixed ThreadPool 的最终目的是重用已经创建的线程。因此,无需在每次提交任务时都重新创建新线程即可看到性能提升。因此,停止时间必须在提交的任务内进行。就在 run 方法的最后一条语句中。

【讨论】:

    【解决方案6】:

    Math.random() 实际上在单个随机数生成器上同步。调用 Math.random() 会导致对数字生成器的显着争用。事实上,你拥有的线程越多,它就会越慢。

    来自 Math.random() javadoc:

    此方法已正确同步,以允许超过 一根线。但是,如果很多线程需要生成伪随机 以很大的速度增加数量,它可以减少每个线程的争用 有自己的伪随机数生成器。

    【讨论】:

      【解决方案7】:

      您需要以某种方式对执行进行分组,以便向每个线程提交更大的计算部分(例如,基于股票代码构建组)。 通过使用 Disruptor,我在类似情况下获得了最佳结果。它的每个作业开销非常低。对作业进行分组仍然很重要,但幼稚的循环通常会造成许多缓存未命中。

      http://java-is-the-new-c.blogspot.de/2014/01/comparision-of-different-concurrency.html

      【讨论】:

        【解决方案8】:

        您提到的“开销”与ExecutorService无关,它是由多个线程在Math.random上同步造成的,造成锁争用。

        所以是的,您遗漏了一些东西(下面的“正确”答案实际上并不正确)。

        这里是一些 Java 8 代码,用于演示 8 个线程运行一个没有锁争用的简单函数:

        import java.util.ArrayList;
        import java.util.List;
        import java.util.concurrent.CountDownLatch;
        import java.util.concurrent.ExecutorService;
        import java.util.concurrent.Executors;
        import java.util.concurrent.TimeUnit;
        import java.util.function.DoubleFunction;
        
        import com.google.common.base.Stopwatch;
        
        public class ExecServicePerformance {
        
            private static final int repetitions = 120;
            private static int totalOperations = 250000;
            private static final int cpus = 8;
            private static final List<Batch> batches = batches(cpus);
        
            private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };
        
            public static void main( String[] args ) throws InterruptedException {
        
                printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
                printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
                printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
                printExecutionTime("Executor pool", ExecServicePerformance::executorPool);
        
            }
        
            private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
                long time = 0;
                for (int i = 0; i < repetitions; i++) {
                    Stopwatch stopwatch = Stopwatch.createStarted();
                    f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
                    time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
                }
                System.out.println(msg + " exec time: " + time);
            }    
        
            private static void synchronous() {
                for ( int i = 0; i < totalOperations; i++ ) {
                    performanceFunc.apply(i);
                }
            }
        
            private static void synchronousBatches() {      
                for ( Batch batch : batches) {
                    batch.synchronously();
                }
            }
        
            private static void asynchronousBatches() {
        
                CountDownLatch cb = new CountDownLatch(cpus);
        
                for ( Batch batch : batches) {
                    Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
                    Thread t = new Thread(r);
                    t.start();
                }
        
                try {
                    cb.await();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }        
            }
        
            private static void executorPool() {
        
                final ExecutorService es = Executors.newFixedThreadPool(cpus);
        
                for ( Batch batch : batches ) {
                    Runnable r = () ->  { batch.synchronously(); };
                    es.submit(r);
                }
        
                es.shutdown();
        
                try {
                    es.awaitTermination( 10, TimeUnit.SECONDS );
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } 
        
            }
        
            private static List<Batch> batches(final int cpus) {
                List<Batch> list = new ArrayList<Batch>();
                for ( int i = 0; i < cpus; i++ ) {
                    list.add( new Batch( totalOperations / cpus ) );
                }
                System.out.println("Batches: " + list.size());
                return list;
            }
        
            private static class Batch {
        
                private final int operationsInBatch;
        
                public Batch( final int ops ) {
                    this.operationsInBatch = ops;
                }
        
                public void synchronously() {
                    for ( int i = 0; i < operationsInBatch; i++ ) {
                        performanceFunc.apply(i);
                    }
                }
            }
        
        
        }
        

        25k 操作的 120 次测试的结果时序(毫秒):

        • 同步执行时间:9956
        • 同步批处理执行时间:9900
        • 每批线程执行时间:2176
        • 执行器池执行时间:1922

        获胜者:执行服务。

        【讨论】:

          【解决方案9】:

          这是我机器上的结果(64 位 Ubuntu 14.0 上的 OpenJDK 8,Thinkpad W530)

          simpleCompuation:6
          computationWithObjCreation:5
          computationWithObjCreationAndExecutors:33
          

          肯定有开销。但请记住这些数字是什么:100k 次迭代 的毫秒数。在您的情况下,每次迭代的开销约为 4 微秒。对我来说,开销大约是四分之一微秒。

          开销是同步、内部数据结构,以及由于复杂的代码路径(肯定比你的 for 循环更复杂)而可能缺乏 JIT 优化。

          尽管有四分之一微秒的开销,但您实际上想要并行化的任务是值得的。


          仅供参考,这将是一个非常糟糕的并行计算。我将线程增加到 8 个(核心数):

          simpleCompuation:5
          computationWithObjCreation:6
          computationWithObjCreationAndExecutors:38
          

          它并没有让它变得更快。这是因为Math.random() 是同步的。

          【讨论】:

            【解决方案10】:

            如果它对其他人有用,这里是真实场景的测试结果 - 在三星 Android 设备上重复使用 ExecutorService 直到所有任务结束。

             Simple computation (MS): 102
             Use threads (MS): 31049
             Use ExecutorService (MS): 257
            

            代码:

               ExecutorService executorService = Executors.newFixedThreadPool(1);
                    int count = 100000;
            
                    //Simple computation
                    Instant instant = Instant.now();
                    for (int i = 0; i < count; i++) {
                        double x = Math.random() * Math.random();
                    }
                    Duration duration = Duration.between(instant, Instant.now());
                    Log.d("ExecutorPerformanceTest", "Simple computation (MS): " + duration.toMillis());
            
            
                    //Use threads
                    instant = Instant.now();
                    for (int i = 0; i < count; i++) {
                        new Thread(() -> {
                            double x = Math.random() * Math.random();
                        }
                        ).start();
                    }
                    duration = Duration.between(instant, Instant.now());
                    Log.d("ExecutorPerformanceTest", "Use threads (MS): " + duration.toMillis());
            
            
                    //Use ExecutorService
                    instant = Instant.now();
                    for (int i = 0; i < count; i++) {
                        executorService.execute(() -> {
                                    double x = Math.random() * Math.random();
                                }
                        );
                    }
                    duration = Duration.between(instant, Instant.now());
                    Log.d("ExecutorPerformanceTest", "Use ExecutorService (MS): " + duration.toMillis());
            

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2022-10-23
              • 1970-01-01
              • 2016-10-02
              • 2015-03-09
              • 1970-01-01
              • 2016-08-13
              相关资源
              最近更新 更多