【问题标题】:BlockingQueue - blocked drainTo() methodsBlockingQueue - 阻塞的 drainTo() 方法
【发布时间】:2009-05-06 21:22:24
【问题描述】:

BlockingQueue 有一个叫做 drainTo() 的方法,但它没有被阻塞。我需要一个我想要阻塞的队列,但也能够在一个方法中检索排队的对象。

Object first = blockingQueue.take();

if ( blockingQueue.size() > 0 )
    blockingQueue.drainTo( list );

我猜上面的代码可以工作,但我正在寻找一个优雅的解决方案。

【问题讨论】:

    标签: java concurrency


    【解决方案1】:

    你指的是JavaDoc中的评论:

    此外,如果指定集合,则此操作的行为未定义 在操作进行时被修改。

    我相信这指的是您的示例中的集合list

    blockingQueue.drainTo(list);
    

    意味着您不能在从blockingQueue 排入list 的同时修改list。但是,阻塞队列在内部同步,因此当调用drainTo 时,puts 和(见下面的注释)gets 将阻塞。如果它不这样做,那么它就不是真正的线程安全的。您可以查看源代码并验证 drainTo 对于阻塞队列本身是线程安全的。

    另外,你的意思是当你调用drainTo 时你希望它阻塞直到至少一个对象被添加到队列中?在这种情况下,您别无选择,只能:

    list.add(blockingQueue.take());
    blockingQueue.drainTo(list);
    

    阻塞直到添加一个或多个项目,然后将整个队列排入集合list

    注意:从 Java 7 开始,gets 和 puts 使用了一个单独的锁。现在允许在 drainTo 期间进行 put 操作(以及许多其他 take 操作)。

    【讨论】:

    • 您要复制到的集合不太可能是线程安全的。有什么意义?
    • 是否保证drainTo(list) 不会调用list.clear()?我在任何地方的 javadoc 中都没有看到。
    • @Recurse:我想,JavaDoc 不保证 drainTo(list) 不会调用 clear(),因为它没有说它不会这样做。但是,如果它对传递给它的集合做了如此重要的事情而没有记录此操作,那将是非常令人惊讶的。许多人会认为这是一个严重的错误。
    • .drainTo() 的 javadoc 合同有点模棱两可(就像大多数自然语言合同一样),但“所有可用元素”都暗示非阻塞。我已经为 Array 和 List 阻塞队列的 Android SDK 实现确认了这一点。
    • @bacar:虽然 corsiKa 说这是 Java 7 的更改,但我看不出在 JavaDoc 中的哪个地方可以保证获取和放置使用单独的锁。可能是JVM实现类都碰巧这样做了,但我看不出这在哪里得到保证。无论如何,drainTo 本身并不是阻塞操作。它返回现在发生在队列中的内容。阻塞队列确实提供了您所说的阻塞操作,但drainTo 不是其中之一。因此,在 drainTo 期间显式阻止 put 是没有意义的,除非您需要这样做以保证一致性。
    【解决方案2】:

    如果你碰巧使用 Google Guava,有一个很不错的 Queues.drain() 方法。

    BlockingQueue.drainTo(Collection, int) 排空队列,但如果 请求的numElements 元素不可用,它将等待 直到指定的超时时间。

    【讨论】:

      【解决方案3】:

      我发现这种模式很有用。

      List<byte[]> blobs = new ArrayList<byte[]>();
      if (queue.drainTo(blobs, batch) == 0) {
         blobs.add(queue.take());
      }
      

      【讨论】:

        【解决方案4】:

        有了可用的 API,我认为您不会变得更加优雅。除了你可以删除大小测试。

        如果您想以原子方式检索连续的元素序列,即使另一个删除操作同时发生,我也不相信 drainTo 能保证这一点。

        【讨论】:

        • drainTo 绝对会阻止任何 put、get、take 等...检查源代码。此 API 对阻塞队列的其他调用是线程安全的。但是,如果在 drainTo 期间您要排放到的 Collection 发生变化,您可能会遇到问题。
        • 好吧,真实,有趣,但除此之外。检查JDK中的实现类。 (这显然是我的意思。)
        • 它们不是唯一可能的实现类。例如,我确信将 ConcurrentLinkedQueue 转换为“ConcurrentLinkedBlockingQueue”相对简单(!)(尽管显然阻塞操作不会是无等待的)。
        【解决方案5】:

        源代码:

         596:     public int drainTo(Collection<? super E> c) {
                      //arg. check
         603:         lock.lock();
         604:         try {
         608:             for (n = 0 ; n != count ; n++) {
         609:                 c.add(items[n]);
         613:             }
         614:             if (n > 0) {
         618:                 notFull.signalAll();
         619:             }
         620:             return n;
         621:         } finally {
         622:             lock.unlock();
         623:         }
         624:     }
        

        ArrayBlockingQueue 急于返回 0。顺便说一句,它可以在获取锁之前做到这一点。

        【讨论】:

        • 没有。获取锁不仅提供互斥,还确保看到的计数是“新鲜的”,可以看到来自其他线程的主内存更新。您可以将“计数”设为易失性,但易失性写入相对较慢,因此写入意味着同时持有锁 + 易失性写入,可能会增加总开销。恕我直言,任何编写并发代码的开发人员都应该尽力理解 Java 内存模型:docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4
        猜你喜欢
        • 2017-04-16
        • 2014-02-08
        • 1970-01-01
        • 1970-01-01
        • 2012-01-05
        • 2010-10-23
        • 1970-01-01
        • 2012-07-13
        • 1970-01-01
        相关资源
        最近更新 更多