【问题标题】:wait()/notify() not working properly等待()/通知()无法正常工作
【发布时间】:2016-11-27 21:01:30
【问题描述】:

我有一个 ConsumerProducer 对象,我想从两个不同的线程获取锁。类如下:

public class ConsumerProducer {

    public String stringPool = null;

    public void put(String s){
        stringPool = s;
    }

    public String get(){
        String ret = stringPool;
        stringPool = null;
        return ret;
    }

}

线程实现类如下:

public class WaitNotifyTest implements Runnable {

    private String threadType;
    public ConsumerProducer cp;
    public static volatile int i = 1;

    public WaitNotifyTest(String threadType, ConsumerProducer cp) {
        this.threadType = threadType;
        this.cp = cp;
    }

    public static void main(String[] args) throws InterruptedException {

        ConsumerProducer cp = new ConsumerProducer();
        WaitNotifyTest test1 = new WaitNotifyTest("Consumer", cp);
        WaitNotifyTest test2 = new WaitNotifyTest("Producer", cp);

        Thread t1 = new Thread(test1);
        Thread t2 = new Thread(test2);

        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }

    @Override
    public void run() {
        while (true) {

            if (threadType.equalsIgnoreCase("Consumer")) {
                synchronized (cp) {
                    try {
                        if (null != cp.get()) {
                            cp.wait();
                        }
                        consume();
                        System.out.println("notify from Consumer");
                        cp.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            } else {
                synchronized (cp) {
                    try {
                        if (null == cp.get()) {
                            cp.wait();
                        }
                        produce();
                        System.out.println("notify from Producer");
                        cp.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            if (i == 5) {
                break;
            }
            i++;
        }
    }

    public void consume() {
        System.out.println("Putting: Counter" + i);
        cp.put("Counter" + i);
    }

    public void produce() {
        System.out.println("getting: " + cp.get());
    }

}

但是当我运行代码时,它遇到了某种死锁,并且像打印一样卡住了

Putting: Counter3
notify from Consumer

发生了严重错误,但我无法识别。请帮忙。

【问题讨论】:

  • 我通常会尽量避免等待/通知,因为它们比 CountDownLatch 和 CyclicBarrier 更难推理。

标签: java multithreading deadlock wait notify


【解决方案1】:

您的消费者在做生产者的工作,而生产者也在做消费者的工作。 交换他们的责任并修改条件等待。请参考下面的代码。

  1. 消费者会等待,当没有东西可以获取时,他会释放cp的锁。这样生产者就有机会进入同步区块。
  2. 生产者只有在什么都没有的时候才生产,否则他会等待。之后,他会释放cp的锁。这样消费者就有机会进入同步区块。
  3. 消费者就是把事情拿走的人。
  4. 生产者是把事情摆在桌面上的人。
  5. 根据您的评论。你想把 Counter 从 1 到 5,所以你应该只在 Producer 线程中添加 i++。你如何控制它在两个线程中的增加?
  6. 您不判断是消费者还是生产者从cp 对象调用get(),而是将null 分配给stringPool。这显然是错误的,并且会使消费者在公共场所变得无效。我添加了一个新方法clearString(),它只会在消费者消费产品时将公共空间设置为空。

    public class WaitNotifyTest implements Runnable {
    
    private String threadType;
    public ConsumerProducer cp;
    public static volatile int i = 0;
    
    public WaitNotifyTest(String threadType, ConsumerProducer cp) {
        this.threadType = threadType;
        this.cp = cp;
    }
    
    public static void main(String[] args) throws InterruptedException {
    
        ConsumerProducer cp = new ConsumerProducer();
        WaitNotifyTest test1 = new WaitNotifyTest("Consumer", cp);
        WaitNotifyTest test2 = new WaitNotifyTest("Producer", cp);
    
        Thread t1 = new Thread(test1);
        Thread t2 = new Thread(test2);
    
        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }
    
    @Override
    public void run() {
        while (true) {
            if (threadType.equalsIgnoreCase("Consumer")) {
                synchronized (cp) {
                    try {
                        /*
                         * Consumer will wait when there is nothing to get and he will release the lock of cp.
                         * So that producer has change to go into the synchronized block.
                         */
    
                        if (null == cp.get()) {
                            cp.wait();
                        }
                        consume();
                        System.out.println("notify from Consumer");
                        cp.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            } else {
                synchronized (cp) {
                    try {
                        /*
                         * Producer only produce when there is nothing or he will wait. At the same time, he will release the lock of cp.
                         * So that consumer has chance to go into the synchronized block.
                         */
                        if (null != cp.get()) {
                            cp.wait();
                        }
                        i++;
                        produce();
                        System.out.println("notify from Producer");
                        cp.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            if (i == 5) {
                break;
            }
    
        }
    }
    
    public void consume() {
        System.out.println("getting: " + cp.get());
        cp.clearString();
    }
    
    public void produce() {
        System.out.println("Putting: Counter" + i);
        cp.put("Counter" + i);
    }}
    

另见 ConsumerProducer 类。

public class ConsumerProducer {
        public String stringPool = null;

        public void put(String s){
            stringPool = s;
        }

        public String get(){
            return stringPool;
        }

        public void clearString(){
            stringPool = null;
        }
}

【讨论】:

  • 谢谢@Gearon。死锁消失了。但是输出并不像预期的那样。输出:Putting: Counter1 notify from Producer getting: null notify from Consumer Putting: Counter3 notify from Producer getting: Counter3 notify from Consumer Putting: Counter5 notify from Producer getting: Counter5 notify from Consumer 不一致。
  • 预期输出:Putting: Counter1 notify from Producer getting: Counter1 notify from Consumer Putting: Counter2 notify from Producer getting: Counter2 notify from Consumer Putting: Counter3 notify from Producer getting: Counter3 notify from Consumer Putting: Counter4 notify from Producer getting: Counter4 notify from Consumer Putting: Counter5 notify from Producer getting: Counter5 notify from Consumer
  • 感谢@Gearon 我得到了第 5 点。但是对第 6 点感到困惑。我将 stringPool 声明为 public volatile String stringPool = null; 而不是单独的方法。我仍然不确定它是如何变为空的。您能否详细说明“这显然是错误的,并且会使消费者在公共场所变得空虚”是什么意思?
  • @AnirbanB。 if (null != cp.get()) { cp.wait();当生产者调用 cp.get() 检查公共空间是否为空时,他会清空公共空间,但这不是他的责任,这是消费者的责任。也就是说,生产者发现公共空间不为空时会等待,但他会先清空公共空间,当消费者获得锁时,他无法从公共空间中得到任何东西。
【解决方案2】:

更新的代码在这里: ConsumerProducer.java:
公共类 ConsumerProducer {

    public volatile String stringPool = null;

    public void put(String s){
        this.stringPool = s;
    }

    public String get(){
        String ret = this.stringPool;
        //this.stringPool = null;
        return ret;
    }
    //added
    public void clearString(){
        this.stringPool = null;
    }

}

WaitNotifyTest.java 公共类 WaitNotifyTest 实现 Runnable {

    private String threadType;
    public ConsumerProducer cp;
    public static volatile int i = 0;

    public WaitNotifyTest(String threadType, ConsumerProducer cp) {
        this.threadType = threadType;
        this.cp = cp;
    }

    public static void main(String[] args) throws InterruptedException {

        ConsumerProducer cp = new ConsumerProducer();
        WaitNotifyTest test1 = new WaitNotifyTest("Consumer", cp);
        WaitNotifyTest test2 = new WaitNotifyTest("Producer", cp);

        Thread t1 = new Thread(test1);
        Thread t2 = new Thread(test2);

        t1.start();
        t2.start();
        t1.join();
        t2.join();
    }

    @Override
    public void run() {
        while (true) {

            if (threadType.equalsIgnoreCase("Consumer")) {
                synchronized (cp) {
                    try {
                        if (null == cp.get()) {
                            cp.wait();
                        }
                        consume();
                        System.out.println("notify from Consumer");
                        cp.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            } else {
                synchronized (cp) {
                    try {
                        if (null != cp.get()) {
                            cp.wait();
                        }
                        i++;
                        produce();
                        System.out.println("notify from Producer");
                        cp.notify();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            if (i == 5) {
                break;
            }

        }
    }

    public void produce() {
        System.out.println("Putting: Counter" + i);
        cp.put("Counter" + i);
    }

    public void consume() {
        System.out.println("getting: " + cp.get());
        cp.clearString();
    }

}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-02-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-06-19
    相关资源
    最近更新 更多