【问题标题】:ConcurrentLinkedQueue with wait() and notify()带有 wait() 和 notify() 的 ConcurrentLinkedQueue
【发布时间】:2012-01-11 19:56:00
【问题描述】:

我并不精通多线程。我试图通过一个生产者线程重复截屏,它将BufferedImage 对象添加到ConcurrentLinkedQueue,消费者线程将poll 队列BufferedImage 对象以将它们保存在文件中。我可以通过重复轮询(while 循环)来使用它们,但我不知道如何使用notify()wait() 来使用它们。我曾尝试在较小的程序中使用wait()notify,但在这里无法实现。

我有以下代码:

class StartPeriodicTask implements Runnable {
    public synchronized void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        if(null!=queue.peek()){
            try {
                System.out.println("Empty queue, so waiting....");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            queue.add(image);
            notify();
        }
    }
}

public class ImageConsumer implements Runnable {
        @Override
        public synchronized void run() {
            while (true) {
                BufferedImage bufferedImage = null;
                if(null==queue.peek()){
                    try {
                        //Empty queue, so waiting....
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else{
                    bufferedImage = queue.poll();
                    notify();
                }
                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
            }

之前我有一个重复的轮询来检查 BufferedImage 对象的存在。现在我已将run 方法更改为synchronised 并尝试实现wait()notify()。我做得对吗?请帮忙。谢谢。

【问题讨论】:

    标签: java concurrency wait notify java.util.concurrent


    【解决方案1】:

    您在作业中使用了错误的QueueConcurrentLinkedQueue 是一个非阻塞队列,这意味着没有生产者消费者语义。如果你只是做一个读者和一个作家,看看SynchronousQueue

    简单地说你的代码可以这样重写

    BlockingQueue<?> queue = new SynchrnousQueue<?>();
    class StartPeriodicTask implements Runnable {
        public void run() {
            Robot robot = null;
            try {
                robot = new Robot();
            } catch (AWTException e1) {
                e1.printStackTrace();
            }
            Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                    .getScreenSize());
            BufferedImage image = robot.createScreenCapture(screenRect);
            queue.offer(image); //1
    }
    public class ImageConsumer implements Runnable {
            @Override
            public void run() {
                while (true) {
                    BufferedImage bufferedImage = queue.poll(); //2
    
                    File imageFile = getFile();
                    if (!imageFile.getParentFile().exists()) {
                        imageFile.getParentFile().mkdirs();
                    }
                        try {
                            ImageIO.write(bufferedImage, extension, imageFile);
                            //Image saved
                        catch (IOException e) {
                            tracer.severe("IOException occurred. Image is not saved to file!");
                        }
                }
    

    原来如此。

    让我解释一下。在第 //1 行,生产线程将把图像“放置”到队列中。我引用 place 因为 SynchrnousQueue 没有深度。实际发生的是线程告诉队列“如果有任何线程从这个队列中请求一个元素,那么给它那个线程让我继续。如果没有,我会等到另一个线程准备好”

    第 //2 行类似于第 1 行,其中消费线程只是等待,直到有线程提供。这适用于单读者单作者

    【讨论】:

    • 单读和单写是什么意思?如果我们有多个线程提供和多个线程轮询怎么办?应该没问题吧?
    • @Ahamed 是的,虽然我的意思是这是一对一的关系。对于放入队列的每个线程,您需要从队列中轮询相应的线程。线程不能放在 SynchrnousQueue 上并继续,必须有一个线程请求一个元素 - 那时放置线程可以继续。
    • 但这也意味着你可以有 100 个写线程和 1 个读线程,这很好,它会工作。您最终将在队列中有许多空闲的写入线程等待该 1 个读取线程轮询。
    • @JohnVint "100 个写入线程和 1 个读取线程" 所以性能方面就像一次运行 1 个线程...因为您的解决方案中没有异步处理...它们都阻塞了彼此...有人可能会问,在这种情况下,为什么不使用 Q,为什么不将 ImageConsumer 与 StartPeriodicTask 结合起来呢?我认为 Ahamed 想要异步处理。
    • 当我说它会起作用时,我的意思是从逻辑/功能的角度来看。如果您有 100 个写入线程和 100 个读取线程,它也将异步工作。这里的队列很有用,因为它允许每个线程在另一个线程处理完该项目之前不继续。
    【解决方案2】:

    第一个问题是您在生产者中的不必要等待:

        if(null!=queue.peek()){ // You are the producer, you don't care if the queue is empty
            try {
                System.out.println("Empty queue, so waiting....");
                wait(); // This puts you to bed, your waiting and so is your consumer
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            queue.add(image);
            notify();
        }
    

    这就是你所需要的:

            queue.add(image);
            notify();
    

    下一个问题是消费者中不必要的notify。它在那个时候产生了对其处理的控制,我相信你的目的是让你的生产者继续前进,但当然你的代码永远不会达到那个程度。所以这个:

                }else{
                    bufferedImage = queue.poll();
                    notify();
                }
                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
    

    应该看起来更像这样:

                }else{
                    bufferedImage = queue.poll();
    
                    File imageFile = getFile();
                    if (!imageFile.getParentFile().exists()) {
                       imageFile.getParentFile().mkdirs();
                    }
    
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
    

    【讨论】:

      【解决方案3】:

      java.util.concurrent 库进入 JDK1.5 后,编写自己的等待/通知逻辑的需求就消失了。在 2012 年,如果您正在做自己的等待/通知,那么您工作太辛苦了,应该强烈考虑尝试和真正的 java.util.concurrent 等效项。

      话虽如此,我相信投票是内置java.util.concurrent.ConcurrentLinkedQueue 背后的想法。换句话说,只要是!isEmpty(),消费者就坐在他们自己的线程和来自 ConcurrentLinkedQue 的 .poll() 项目中。我见过的大多数实现都会在!isEmpty() 的测试之间进行一秒钟的睡眠,但我认为这实际上没有必要。另外,请注意 Vint 家伙对我的回答的评论,.poll() 可能会返回null。考虑java.util.AbstractQueue 的替代实现,它的阻塞行为可能更接近于您所寻找的。​​p>

      这个人有一个简单的例子:http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4

      最后,获取 Goetz 的书“Java Concurrency In Practice”并阅读它。我几乎可以肯定它有一个配方可以用来替换您自己的本土等待/通知。

      【讨论】:

      • @Bob Kuhar '换句话说,消费者坐在他们自己的线程中并从 ConcurrentLinkedQue 中删除项目,只要它是 !isEmpty()' 这不是真的。 ArrayBlockingQueue 和 LinkedBlockingQueue 都是如此,如果 isEmpty() 则 ConcurrentLinkedQueue.poll() 将返回 null
      • @JohnVint 感谢您指出这一点。我改变了措辞,因为这不是我所说的“轮询”方法,而是通过反复测试 isEmpty() 来轮询队列的概念。
      猜你喜欢
      • 2021-05-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-06-08
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多