【问题标题】:block and wakeup consumer threads/阻塞和唤醒消费者线程/
【发布时间】:2011-12-18 16:29:41
【问题描述】:

我有 2 个线程。

他们每个人都从共享缓冲区中读取一些数据。

currentDataBuffer.get(thisID); //currentDataBuffer is my shared buffer object

我想在每次调用 get 后阻塞每个线程,并在所有线程读取缓冲区时释放它(一次) 所以我用这个currentDataBuffer 对象作为锁:

currentDataBuffer.get(thisID);
synchronized (currentDataBuffer ) {
   currentDataBuffer.wait();
}

问题是当所有线程完成从缓冲区读取(每个线程)时,我如何释放这些线程?

currentDataBuffer 内部,我有一个映射,其中存储了从缓冲区读取数据的线程的 ID。

如何使用this.notifyAll();(来自currentDataBuffer)唤醒所有锁定的线程?

【问题讨论】:

    标签: java multithreading


    【解决方案1】:

    我建议你使用 Java BlockingQueue 数据结构。调用BlockingQueue.take() 会阻塞直到元素可用。 所以而不是:

    currentDataBuffer.get(thisID);
    synchronized (currentDataBuffer ) {
       currentDataBuffer.wait();
    }
    

    你将拥有:

    currentDataBuffer.take();
    

    take()调用中阻塞的线程可以通过调用BlockingQueue.offer(object)方法向队列中添加一个元素来释放

    【讨论】:

    • 我需要一个用于 n 线程的缓冲区,只有在所有线程读取相同的数据之后,我才需要移动到缓冲区中的下一项。
    【解决方案2】:

    我认为 CountdownLatch (http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html) 会有所帮助。

    即使用初始值为 2 的 CountDownLatch (或您要处理的任何线程数)增加您的 currentDataBuffer。处理完线程调用latch.countDown()然后latch.await()。

    为了安全起见,您可能需要小心 countDowns 不会丢失(例如,如果抛出异常)。

    【讨论】:

    • 我不能使用 CountDownLatch 因为我不能改变(代码)线程的创建
    • 您是否可以控制 currentDataBuffer 的类?如果是这样,您可以在其 get 方法中添加 countDown()/await()。
    • 是的,我可以控制 currentDataBuffer,但我为什么要在那里等待呢?消费者线程是那些从缓冲区读取数据的线程!
    • 如果我在第一个线程尝试读取数据后将等待放入 get(),则没有其他线程会唤醒它
    【解决方案3】:

    继续您的代码的答案可能是使用currentDataBuffer.notifyAll() 而不是this.notifyAll()(不清楚this 从您的问题中指的是什么)。但是,您如何确保在所有线程都读取缓冲区并进入等待状态之后调用notifyAll

    如果您知道应该读取缓冲区的线程数,更好的解决方案是使用两个CountDownLatch。该类的 javadoc 中有一个很好的示例。

    ------------ 更新:

    您无需更改阅读器线程的代码。您应该在 currentDataBuffer 对象中握住闩锁。在currentDataBuffer 中创建一个等待n 项目锁存器的线程。在get 方法中,在读取完成后对该锁存器调用CountDownLatch.countDown()。哦,我会写的:

    class CurrentDataBuffer implements Runnable {
    
        private CountDownLatch singleThreadFinishedSignal;
        private CountDownLatch allThreadsFinishedSignal;
    
        public CurrentDataBuffer(int N) {
            singleThreadFinishedSignal = new CountDownLatch(N); // waiter thread waits on this
            allThreadsFinishedSignal = new CountDownLatch(1); // reader threads wait on this after finished reading
        }
    
        public void run() {
            try {
                singleThreadFinishedSignal.await(); // wait for all reader threads to finish
                allThreadsFinishedSignal.countDown(); // let all reader threads proceed
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void get() {
            try {
                // read buffer item here ///////////////////
                singleThreadFinishedSignal.countDown(); // mark that a thread has read the item
                allThreadsFinishedSignal.await(); // wait for other reader threads to finish
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    【讨论】:

    • 是的,我知道线程数。我在 currentDataBuffer 对象本身中使用 this.notifyAll(),这就是为什么它是“this”。
    • 我不能使用 CountDownLatch 因为我不能改变(代码)线程的创建
    • 我不明白,它怎么知道什么线程使用它?!我不必在消费者线程部分指定任何内容??
    • 我从消费者线程中调用了 get(在 currentDataBuffer.get(thisID) 行上方)。两个线程都停在那里。
    • 如果您事先知道线程的数量,那么您可以只公开一个 getter(如示例中所示),强制调用它的消费者等待直到执行其他 getN 调用。
    猜你喜欢
    • 1970-01-01
    • 2011-01-29
    • 1970-01-01
    • 2017-07-08
    • 2015-09-25
    • 1970-01-01
    • 1970-01-01
    • 2014-03-19
    • 2011-01-09
    相关资源
    最近更新 更多