【问题标题】:java concurrency: multi-producer one-consumerjava并发:多生产者一消费者
【发布时间】:2012-04-17 21:31:48
【问题描述】:

我有一种情况,不同的线程填充一个队列(生产者),一个消费者从这个队列中检索元素。我的问题是,当从队列中检索这些元素之一时,会丢失一些元素(丢失信号?)。生产者代码是:

class Producer implements Runnable {

    private Consumer consumer;

    Producer(Consumer consumer) { this.consumer = consumer; }

    @Override
public void run() {
    consumer.send("message");
  }
}

它们是通过以下方式创建和运行的:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) {
  executor.execute(new Producer(consumer));
}

消费代码是:

class Consumer implements Runnable {

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) {
    synchronized (queue) {
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    }
}

@Override
public void run() {
    int counter = 0;
    synchronized (queue) {
    while(true) {
        try {
            System.out.println("SLEEP");
                queue.wait(10);
        } catch (InterruptedException e) {
                Thread.interrupted();
        }
        System.out.println(counter);
        if (!queue.isEmpty()) {             
            queue.poll();
            counter++;
        }
    }
    }
}

}

运行代码时,有时会添加 20 个元素并检索 20 个元素,但在其他情况下检索的元素少于 20 个。知道如何解决这个问题吗?

【问题讨论】:

  • 您正在使用低级同步结构(waitnotify)和高级同步结构(ConcurrentLinkedQueueExecutorService)的奇怪组合。使用其中一个!
  • 我做到了,但在这两种情况下我都有同样的问题
  • 看不到实际运行 Consumer 的代码。
  • 只是一个普通的新线程(consumer).start()

标签: java concurrency consumer producer


【解决方案1】:

我建议您使用 BlockingQueue 而不是 Queue。 LinkedBlockingDeque 可能是您的理想选择。

您的代码如下所示:

void send(String message) {
    synchronized (queue) {
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    }
}

然后你只需要

queue.take()

在您的消费者线程上

这个想法是 .take() 将阻塞,直到队列中有一个可用的项目,然后完全返回 one (我认为您的实现会受到影响:轮询时缺少通知)。 .put() 负责为您处理所有通知。无需等待/通知。

【讨论】:

  • 试过 LinkedBlockingDeque 但我仍然遇到同样的问题
  • @Randomize 您能否发布一个使用 BlockingQueue 的有问题代码的示例?消费者代码应该足够了。
  • 我正在重用上面完全相同的代码,我刚刚用 LinkedBlockingDeque 替换了 ConcurrentLinkedQueue。
  • 如上所述,使用BlockingQueue,您应该a) 摆脱等待/通知呼叫,b) 使用.put().take() 而不是.add().poll() .
  • 为什么要围绕queue.put同步?
【解决方案2】:

您的代码中的问题可能是因为您使用的是notify 而不是notifyAll。如果有一个等待锁的线程,前者只会唤醒一个线程。这允许没有线程在等待并且信号丢失的竞争条件。 notifyAll 将要求所有线程唤醒以检查它们是否可以获得锁,从而以较小的性能成本强制正确性。

这在Effective Java 1st ed 中得到了最好的解释(参见第 150 页)。第 2 版删除了这个提示,因为程序员应该使用 java.util.concurrent 来提供更强的正确性保证。

【讨论】:

  • 只有一个消费者,所以 notify / notifyAll 没有区别
【解决方案3】:

同时使用 ConcurrentLinkedQueue 和同步似乎是个坏主意。它首先违背了并发数据结构的目的。

ConcurrentLinkedQueue 数据结构没有问题,用 BlockingQueue 替换它可以解决问题,但这不是根本原因。

问题在于 queue.wait(10)。这是定时等待方法。 10ms 后会再次获取锁。

  1. 通知 (queue.notify() ) 将丢失,因为如果 10 毫秒已过,则没有消费者线程在等待它。

  2. 生产者将无法添加到队列中,因为他们无法获取锁,因为消费者再次申请了锁。

迁移到 BlockingQueue 解决了您的问题,因为您删除了 wait(10) 代码,BlockingQueue 数据结构处理了等待和通知。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多