【问题标题】:Java - Multiple queue producer consumerJava - 多队列生产者消费者
【发布时间】:2014-11-14 18:09:55
【问题描述】:

我有以下代码:

    while(!currentBoard.boardIsValid()){
        for (QueueLocation location : QueueLocation.values()){
            while(!inbox.isEmpty(location)){
                Cell c = inbox.dequeue(location);
                notifyNeighbours(c.x, c.y, c.getCurrentState(),previousBoard);
            }
        }
    }

我有一个有几个队列的消费者(他们的所有方法都是同步的)。每个生产者一个队列。消费者循环遍历所有队列并检查他们是否有任务供他消费。 如果他正在检查的队列中有一个任务,他就会使用它。否则,他会去检查下一个队列,直到他完成对所有队列的迭代。

到目前为止,如果他遍历所有队列并且它们都是空的,他会继续循环而不是等待其中一个包含某些内容(如外部 while 所见)。

如何让消费者等到其中一个队列中有东西?

我遇到以下情况的问题:假设只有 2 个队列。消费者检查了第一个,它是空的。就在他检查第二个(也是空的)时,生产者在第一个队列中放了一些东西。就消费者而言,队列都是空的,所以他应该等待(即使其中一个不再是空的,他应该继续循环)。

编辑: 最后一件事。这对我来说是一个练习。我正在尝试自己实现同步。因此,如果任何 java 库都有实现此功能的解决方案,我对此不感兴趣。我试图了解如何实现这一点。

【问题讨论】:

  • 除非您可以修改生产者以执行额外的信号/通知,否则有两种解决方案,1) 使用超时并在 CPU 消耗和响应性之间进行权衡,或者 2) 使用额外的线程在队列中等待并通知消费者。第二种方法的额外线程会消耗更多内存,但不会占用大量 CPU 时间,因为它们大部分时间都在等待。

标签: java multithreading producer-consumer


【解决方案1】:

@Abe 很接近。我会使用信号并等待 - 使用内置的 Object 类,因为它们是最轻的。

Object sync = new Object();  // Can use an existing object if there's an appropriate one

// On submit to queue
synchronized ( sync ) {
    queue.add(...);  // Must be inside to avoid a race condition
    sync.notifyAll();
}

// On check for work in queue
synchronized ( sync ) {
    item = null;
    while ( item == null ) {
        // Need to check all of the queues - if there will be a large number, this will be slow,
        // and slow critical sections (synchronized blocks) are very bad for performance
        item = getNextQueueItem();
        if ( item == null ) {
            sync.wait();
        }
    }
}

注意sync.wait 释放同步锁直到通知 - 同步锁是成功调用等待方法所必需的(这是对程序员的提醒,它确实需要某种类型的临界区才能工作可靠)。

顺便说一句,如果可行的话,我会推荐一个专用于消费者(或一组消费者)的队列,而不是一个专用于生产者的队列。它将简化解决方案。

【讨论】:

    【解决方案2】:

    如果您想跨多个队列进行阻塞,那么一种选择是使用 java 的Lock and Condition objects and then use the signal method

    所以每当生产者有数据时,它应该调用signallAll

    Lock fileLock = new ReentrantLock();
    Condition condition = fileLock.newCondition();
    ...
    // producer has to signal
    condition.signalAll();
    ...
    // consumer has to await.
    condition.await();
    

    只有在提供信号时,消费者才会去检查队列。

    【讨论】:

      【解决方案3】:

      我按照@Abe 的建议解决了类似的情况,但最终决定将SemaphoreAtomicBoolean 结合使用,并将其称为BinarySemaphore。它确实需要对生产者进行修改,以便他们在有事情要做时发出信号。
      下面是 BinarySemaphore 的代码和消费者工作循环应该是什么样子的一般概念:

      import java.util.concurrent.Semaphore;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.atomic.AtomicBoolean;
      
      public class MultipleProdOneConsumer {
      
      BinarySemaphore workAvailable = new BinarySemaphore();
      
      class Consumer {
      
          volatile boolean stop;
      
          void loop() {
      
              while (!stop) {
                  doWork();
                  if (!workAvailable.tryAcquire()) {
                      // waiting for work
                      try {
                          workAvailable.acquire();
                      } catch (InterruptedException e) {
                          if (!stop) {
                              // log error
                          }
                      }
                  }
              }
          }
      
          void doWork() {}
      
          void stopWork() {
              stop = true;
              workAvailable.release();
          }
      }
      
      class Producer {
      
          /* Must be called after work is added to the queue/made available. */
          void signalSomethingToDo() {
              workAvailable.release();
          }
      }
      
      class BinarySemaphore {
      
          private final AtomicBoolean havePermit = new AtomicBoolean();
          private final Semaphore sync;
      
          public BinarySemaphore() {
              this(false);
          }
      
          public BinarySemaphore(boolean fair) {
              sync = new Semaphore(0, fair);
          }
      
          public boolean release() {
      
              boolean released = havePermit.compareAndSet(false, true);
              if (released) {
                  sync.release();
              }
              return released;
          }
      
          public boolean tryAcquire() {
      
              boolean acquired = sync.tryAcquire();
              if (acquired) {
                  havePermit.set(false);
              }
              return acquired;
          }
      
          public boolean tryAcquire(long timeout, TimeUnit tunit) throws InterruptedException {
      
              boolean acquired = sync.tryAcquire(timeout, tunit);
              if (acquired) {
                  havePermit.set(false);
              }
              return acquired;
          }
      
          public void acquire() throws InterruptedException {
      
              sync.acquire();
              havePermit.set(false);
          }
      
          public void acquireUninterruptibly() {
      
              sync.acquireUninterruptibly();
              havePermit.set(false);
          }
      
      }
      
      }
      

      【讨论】: