【问题标题】:Thread pool where workers are both producers and consumers工人既是生产者又是消费者的线程池
【发布时间】:2012-10-22 06:24:39
【问题描述】:

我有一个可以异步处理的无限作业队列。每个作业的处理可能会或可能不会触发为此队列创建新作业。

我想要一个由多个工作线程组成的池来从这个队列中获取项目并并行处理它们,直到队列都为空并且所有工作线程都处于空闲状态等待队列中的新作业(因为忙碌的工作人员最终可能会将新作业添加到队列中)。

是否有使用java.util.concurrent 实现的方法,我可以使用它来解决这个特殊问题,其中工人也是生产者?尚不清楚 API 是否以直接的方式支持这种情况。

特别是,我希望能够检测到终止条件,即当没有更多作业可用(空作业队列)并且不会再产生作业(所有空闲工作线程)时。

编辑

下面的 Nam San 的回答似乎是最优雅的方法,它基本上归结为跟踪提交的作业数量与已完成的作业数量,并使用这些数字相等的情况作为终止条件。

我已经使用java.util.concurrent 实现了一个完整的示例,它扩展了ThreadPoolExecutor 来实现这一点,并且专门化了作业队列以接受以特定方式排序的Comparable 实例。

  • TestExecutor.java:一个自定义执行器,它扩展了ThreadPoolExecutor,但有额外的方法来执行可能会创建新作业的作业,以及一个等待所有提交的作业完成的新 await 方法。
  • WorkUnit.java:一个可比较的可运行作业示例,它可以创建新作业以提交给TestExecutor
  • Test.java:包含一个使用 WorkUnit 实例和 TestExecutor 运行示例的 main 方法。

【问题讨论】:

    标签: java multithreading concurrency producer-consumer java.util.concurrent


    【解决方案1】:

    下面的代码演示了如何使用 Executor 周围的包装类来计算提交的作业数并将其与已完成的作业数进行比较以实现您想要的结果。请注意,您的任务必须调用包装类的execute 方法,并且永远不要直接调用底层的Executor。如果需要,扩展下面的包装器以包装ExecutorService 的“提交”方法应该很简单。

    public class ExampleExecutor {
    
        private final Executor executor;
        private long submitCount = 0;
        private long doneCount = 0;
    
        public ExampleExecutor(Executor executor) {
            this.executor = executor;
        }
    
        public synchronized void execute(Collection<Runnable> commands) {
            for (Runnable command : commands) {
                execute(command);
            }
        }
    
        public synchronized void execute(final Runnable command) {
            submitCount ++;
    
            executor.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        synchronized (ExampleExecutor.this) {
                            doneCount++;
                            if (doneCount == submitCount) {
                                ExampleExecutor.this.notifyAll();
                            }
                        }
                    }
                }
            });
        }
    
        public synchronized void awaitCompletion() throws InterruptedException {
            while (doneCount != submitCount) {
                this.wait();
            }
        }
    }
    

    编辑:在下面添加了测试用例来演示如何使用上述代码

    public class Test {
    
        static class Task implements Runnable {
            private final String id;
            private final long repetitions;
            private final long respawnSize;
            private final ExampleExecutor executor;
    
            public Task(String id, long repetitions, long respawnSize, ExampleExecutor executor) {
                this.id = id;
                this.repetitions = repetitions;
                this.respawnSize = respawnSize;
                this.executor = executor;
            }
    
            public void run() {
                for (int i = 0; i < respawnSize; i ++) {
                    // Spawning new sub tasks
                    executor.execute(new Task(id + "-" + i, repetitions/2, 0, null));
                }
    
                double sum = 0;
                for (int i = 0; i < repetitions; i++) {
                    sum += Math.sin(i);
                }
    
                System.err.println(id + " completed at " + System.currentTimeMillis() + " with sum=" + sum);
            }
        }
    
        public static void main(String argv[]) throws InterruptedException {
            ExampleExecutor executor = new ExampleExecutor(Executors.newFixedThreadPool(2));
            executor.execute(new Task("0", 2000000, 100, executor));
    
            System.err.println("main thread awaits completion");
            executor.awaitCompletion();
            System.err.println("main thread recieved completion event");
        }
    }
    

    【讨论】:

    • @sharky 我相信你已经错过了 - 理解代码,它不会轮询。我应该澄清你将如何使用它 - 你有一个主线程通过 execute() 提交一个或多个任务,然后线程调用 awaitCompletion() 它将阻塞,直到所有任务完成并且队列为空。您的任务可以通过调用 execute() 提交额外的工作,但任务绝不能调用 awaitCompletion()。然后,您的主线程将收到通知,并在所有作业完成后从 awaitCompletion() 调用返回。我已经编辑了答案以添加一些演示代码。
    【解决方案2】:

    我认为消费者也是生产者并不重要,因为在生产者-消费者模式中,它们是完全不同的关注点。

    您的消费者已经拥有对队列的引用 - 只需让他们像生产者一样添加到队列中。

    您可以使用AtomicInteger 或类似名称来记录当前有多少工作人员处于活动状态,或者如果您想等到他们全部静止,请使用CountDownLatch

    【讨论】:

      【解决方案3】:

      请参阅我在Directory Scanner 上的帖子它满足大部分要求。但它没有用 Futures 和 Callable 实现。得想一想。每个任务都没有被赋予重要性。没有结果并且产生异常。它只是一种扫描文件的并行递归方式。

      【讨论】:

        【解决方案4】:

        我已经看到了针对此类问题的几种不同解决方案。

        一个是仍然使用poll作为主线程中的阻塞调用,就像在你的代码中一样,但是在主线程可能永远等待的情况下将一个“虚拟”对象从一个工作线程中排入队列以唤醒主线程.例如,任何在没有向队列中添加更多项目的情况下完成的工作人员应该提交一个虚拟作业,主线程识别并忽略它(它仅用于唤醒主线程)。通过跟踪活动作业的数量,您可以创建更少的虚拟对象,从而减少“虚假唤醒”,从而减少“虚假唤醒”——只有最后一个作业需要添加虚拟对象。

        另一种方法是等待不同的对象。例如,任何旧的Object 都可以。在这个对象上有主线程wait()。然后作业在完成时使用Object.notify() 唤醒这个线程。同样,通过计数,您可以减少所需通知的数量。

        最优雅的解决方案可能是使用Semaphore。基本上,信号量的值将是“飞行作业+队列项目”数量的负数。当一个作业从队列中取出一个项目时,这个值不会改变(因为飞行中的作业增加一个,而队列项目减少一个),但是每个作业都应该为他们添加的每个作业调用 reducePermits(),并且在他们完成之前调用一次 release()。

        然后主线程可以在工作期间阻塞acquire()。当它醒来时,一切都完成了(因为在飞行中+排队的工作为零)。您将启动另一个线程来实际执行poll 调用并添加作业(目前由主线程完成),并且当主线程上的acquire 返回时,可以关闭该工作人员。但是,让现有工作人员poll() 自己而不是完成工作可能更简单。那么你根本不需要这个传递函数。

        事实上,使用Semaphore 解决方案,为什么不完全放弃队列,而使用内置在执行程序中的队列呢?也就是说,工人是否通过executor.submit(newJob(nextJob)) 提交了新工作?无论如何,执行器线程在内部都在从阻塞队列中提取工作,因此在具有显式外部队列方面存在一些重复。

        【讨论】:

          【解决方案5】:

          几年前,我不得不做一些类似但有界堆栈的事情。我将分享一个可能的解决方案:

          idle_thread = MAX_THREAD;
          do
          {
              if(queue != empty) // If thread have work to do
              {
                 idle_threads--;  // Count this threads was a worker   
                 flag = true;
                 while(queue != empty)  // Until queue have work
                 {
                    synchronized(this)
                    {
                      // task =  take_out_of_queue;
                    }
                  }
             }
             if(flag) // This flag must to be local to each thread, it is use to insure 
             {        // that threads will count this only one time for each time 
                    // the queue got empty
                   synchronized(this)
                   {
                      if(flag == false)
                      idle_threads++;  // Count thread as a idle one
                      flag = false;
                   }
               }
               if(idle_threads == MAX_THREADS) out = true; // When all threads are idle stop the work loop
          } while(!out)
          

          【讨论】:

            猜你喜欢
            • 2018-11-05
            • 2018-09-24
            • 2017-02-01
            • 2012-04-30
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多