【问题标题】:Java BlockingQueue with batching?带有批处理的Java BlockingQueue?
【发布时间】:2012-03-17 00:27:22
【问题描述】:

我对与 Java BlockingQueue 相同的数据结构感兴趣,但它必须能够批处理队列中的对象。换句话说,我希望生产者能够将对象放入队列,但让消费者阻塞take(),直到队列达到一定大小(批量大小)。

然后,一旦队列达到批量大小,生产者必须阻塞put(),直到消费者消耗完队列中的所有元素(在这种情况下,生产者将再次开始生产,消费者阻塞直到再次达到批次)。

是否存在类似的数据结构?或者我应该写它(我不介意),我只是不想浪费我的时间,如果有什么东西在那里。


更新

也许可以澄清一下:

情况总是如下。可以有多个生产者向队列中添加项目,但从队列中获取项目的消费者永远不会超过一个。

现在,问题在于这些设置中有多个并行和串行。换句话说,生产者为多个队列生产项目,而消费者本身也可以是生产者。这可以更容易地被认为是生产者、消费者-生产者,最后是消费者的有向图。

生产者应该阻塞直到队列为空(@Peter Lawrey)的原因是因为它们中的每一个都将在一个线程中运行。如果您让它们在空间可用时简单地产生,您最终会遇到一种情况,即您有太多线程试图一次处理太多事情。

也许将其与执行服务结合可以解决问题?

【问题讨论】:

    标签: java queue producer-consumer blockingqueue


    【解决方案1】:

    我建议你使用BlockingQueue.drainTo(Collection, int)。您可以将它与 take() 一起使用,以确保获得最少数量的元素。

    使用这种方法的优点是您的批量大小会随着工作负载而动态增长,并且生产者不必在消费者忙碌时进行阻塞。即它会针对延迟和吞吐量进行自我优化。


    要完全按照要求实现(我认为这是一个坏主意),您可以使用 SynchronousQueue 和繁忙的消费线程。

    即消费线程做了一个

     list.clear();
     while(list.size() < required) list.add(queue.take());
     // process list.
    

    只要消费者忙,生产者就会阻塞。

    【讨论】:

    • 我希望生产者在消费者忙碌时阻塞。
    • 有趣的是,大多数系统都竭尽全力避免这种情况。 ;) 第二个建议正是这样做的。如果您希望生产者阻塞,为什么要使用多个线程?让“生产者”成为处理器/消费者不是更简单,而且您似乎不希望它们同时运行。
    • 请看我的更新。该设计要求生产者也阻塞,以使执行线程的数量保持在较低水平。此外,它还解决了生产者和消费者之间的依赖问题。
    • 在这种情况下,上面的建议可以做到这一点。它适用于多个生产者。我发现最好让线程调度程序为你工作,而不是试图发明你自己的。 ;)
    【解决方案2】:

    这是一个快速(= 简单但未经过全面测试)的实现,我认为它可能适合您的请求 - 如果需要,您应该能够扩展它以支持完整的队列接口。

    为了提高性能,您可以切换到 ReentrantLock 而不是使用“同步”关键字..

    public class BatchBlockingQueue<T> {
    
        private ArrayList<T> queue;
        private Semaphore readerLock;
        private Semaphore writerLock;
        private int batchSize;
    
        public BatchBlockingQueue(int batchSize) {
            this.queue = new ArrayList<>(batchSize);
            this.readerLock = new Semaphore(0);
            this.writerLock = new Semaphore(batchSize);
            this.batchSize = batchSize;
        }
    
        public synchronized void put(T e) throws InterruptedException {
            writerLock.acquire();
            queue.add(e);
            if (queue.size() == batchSize) {
                readerLock.release(batchSize);
            }
        }
    
        public synchronized T poll() throws InterruptedException {
            readerLock.acquire();
            T ret = queue.remove(0);
            if (queue.isEmpty()) {
                writerLock.release(batchSize);
            }
            return ret;
        }
    
    }
    

    希望你觉得它有用。

    【讨论】:

      【解决方案3】:

      我不知道。如果我理解正确,您希望生产者工作(当消费者被阻塞时)直到它填满队列或消费者工作(当生产者阻塞时)直到它清除队列。如果是这种情况,我可能建议您不需要数据结构,而是需要一种机制来阻止一方,而另一方正在互斥体中工作。您可以为此锁定一个对象,并在内部具有满或空的逻辑来释放锁定并将其传递给另一方。所以简而言之,你应该自己写:)

      【讨论】:

        【解决方案4】:

        这听起来就像 RingBuffer 在 LMAX Disruptor 模式中的工作方式。请参阅http://code.google.com/p/disruptor/ 了解更多信息。

        一个很粗略的解释是你的主要数据结构是RingBuffer。生产者按顺序将数据放入环形缓冲区,消费者可以提取生产者放入缓冲区的尽可能多的数据(因此本质上是批处理)。如果缓冲区已满,生产者会阻塞,直到消费者完成并释放缓冲区中的插槽。

        【讨论】:

          【解决方案5】:

          我最近开发了这个实用程序,如果队列元素没有达到批量大小,它会使用刷新超时来批量处理 BlockingQueue 元素。它还支持使用多个实例来详细说明同一组数据的扇出模式:

          // Instantiate the registry
          FQueueRegistry registry = new FQueueRegistry();
          
          // Build FQueue consumer
          registry.buildFQueue(String.class)
                          .batch()
                          .withChunkSize(5)
                          .withFlushTimeout(1)
                          .withFlushTimeUnit(TimeUnit.SECONDS)
                          .done()
                          .consume(() -> (broadcaster, elms) -> System.out.println("elms batched are: "+elms.size()));
          
          // Push data into queue
          for(int i = 0; i < 10; i++){
                  registry.sendBroadcast("Sample"+i);
          }
          

          更多信息在这里!

          https://github.com/fulmicotone/io.fulmicotone.fqueue

          【讨论】:

            猜你喜欢
            • 2021-07-26
            • 1970-01-01
            • 2023-04-08
            • 1970-01-01
            • 1970-01-01
            • 2019-03-03
            • 1970-01-01
            • 2013-09-11
            • 1970-01-01
            相关资源
            最近更新 更多