【问题标题】:producer - consumer multithreading in Java生产者 - Java 中的消费者多线程
【发布时间】:2013-08-13 09:30:06
【问题描述】:

我想在 Java 中使用多线程等待和通知方法编写程序。
该程序有一个堆栈(最大长度 = 5)。生产者永远生成数字并入栈,消费者从栈中取出。

当栈满时,生产者必须等待,当栈为空时,消费者必须等待。
问题是它只运行一次,我的意思是一旦它产生 5 个数字它就会停止,但我将运行方法放在 while(true) 块中以不间断地运行,但它没有。
这是我到目前为止所尝试的。
生产者类:

package trail;
import java.util.Random;
import java.util.Stack;

public class Thread1 implements Runnable {
    int result;
    Random rand = new Random();
    Stack<Integer> A = new Stack<>();

    public Thread1(Stack<Integer> A) {
        this.A = A;
    }

    public synchronized void produce()
    {
        while (A.size() >= 5) {
            System.out.println("List is Full");
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        result = rand.nextInt(10);

        System.out.println(result + " produced ");
        A.push(result);
        System.out.println(A);

        this.notify();
    }

    @Override
    public void run() {
        System.out.println("Producer get started");

        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            produce();
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

消费者:

package trail;

import java.util.Stack;

public class Thread2 implements Runnable {
    Stack<Integer> A = new Stack<>();

    public Thread2(Stack<Integer> A) {
        this.A = A;
    }

    public synchronized void consume() {
        while (A.isEmpty()) {
            System.err.println("List is empty" + A + A.size());
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.err.println(A.pop() + " Consumed " + A);
        this.notify();
    }

    @Override
    public void run() {
        System.out.println("New consumer get started");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            consume();
        }
    }
}

这里是主要方法:

public static void main(String[] args) {

        Stack<Integer> stack = new Stack<>();

        Thread1 thread1 = new Thread1(stack);// p
        Thread2 thread2 = new Thread2(stack);// c
        Thread A = new Thread(thread1);
        Thread B = new Thread(thread2);
        Thread C = new Thread(thread2);
        A.start();

        B.start();
        C.start();     
    }

【问题讨论】:

  • 你在生产者中同步什么对象?你在消费者中同步什么对象?
  • 请补全代码,去掉空行和无用cmets的行
  • Thread1 类中的prodeuce 方法和Thread2 类中的consume 方法同步

标签: java multithreading wait producer-consumer notify


【解决方案1】:

我认为,如果您尝试将当前混合在一起的三件事分开,则总体上会更好地理解和处理同步:

  1. 将执行实际工作的任务。 Thread1Thread2 的类名称具有误导性。它们不是 Thread 对象,但它们实际上是实现 Runnable 接口的作业或任务,您提供给Thread 对象。

  2. 您在 main 中创建的线程对象本身

  3. 共享对象,它封装了队列、堆栈等上的同步操作/逻辑。此对象将在任务之间共享。在这个共享对象中,您将负责添加/删除操作(使用同步块或同步方法)。目前(正如已经指出的那样),同步是在任务本身上完成的(即每个任务等待并通知自己的锁,但什么也没有发生)。当您分离关注点时,即让一个班级正确地做一件事,最终会清楚问题出在哪里。

【讨论】:

    【解决方案2】:

    你的消费者和你的生产者在不同的对象上同步并且不会互相阻塞。如果这行得通,我敢说这是偶然的。

    阅读java.util.concurrent.BlockingQueuejava.util.concurrent.ArrayBlockingQueue。这些为您提供了更现代、更简单的方式来实现此模式。

    http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html

    【讨论】:

    • 在哪里可以找到有用的文章?
    【解决方案3】:

    你应该在堆栈上同步,而不是把它放在方法级别试试这个代码。

    也不要在你的线程类中初始化堆栈,不管你是从主类的构造函数中传递它们,所以不需要。

    始终尽量避免使用 synchronized 关键字标记任何方法,而不是将代码的关键部分放在同步块中,因为同步区域的大小越大,对性能的影响就越大。

    因此,始终只将那些代码放入需要线程安全的同步块中。

    生产者代码:

    public void produce() {
        synchronized (A) {
            while (A.size() >= 5) {
                System.out.println("List is Full");
                try {
                    A.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            result = rand.nextInt(10);
    
            System.out.println(result + " produced ");
            A.push(result);
            System.out.println("stack ---"+A);
    
            A.notifyAll();
        }
    }
    

    消费者代码:

    public void consume() {
        synchronized (A) {
            while (A.isEmpty()) {
                System.err.println("List is empty" + A + A.size());
                try {
                    System.err.println("wait");
                    A.wait();
    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.err.println(A.pop() + " Consumed " + A);
            A.notifyAll();
        }
    }
    

    【讨论】:

    • A.notifyAll() 究竟做了什么?
    • 它不会通知所有正在等待特定事件的线程,在我们的例子中,它将是当生产者线程在堆栈上等待为空而消费者线程在堆栈上等待包含一些数据时。 - 谢谢
    • 为什么我们使用 while 循环而不是 if ?有什么具体原因吗?
    【解决方案4】:

    试试这个:

    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class CircularArrayQueue<T> {
    
        private volatile Lock rwLock = new ReentrantLock();
        private volatile Condition emptyCond = rwLock.newCondition();
        private volatile Condition fullCond = rwLock.newCondition();
    
        private final int size;
    
        private final Object[] buffer;
        private volatile int front;
        private volatile int rare;
    
        /**
         * @param size
         */
        public CircularArrayQueue(int size) {
            this.size = size;
            this.buffer = new Object[size];
            this.front = -1;
            this.rare = -1;
        }
    
        public boolean isEmpty(){
            return front == -1;
        }
    
        public boolean isFull(){
            return (front == 0 && rare == size-1) || (front == rare + 1);
        }
    
        public void enqueue(T item){
            try {
                // get a write lock
                rwLock.lock();
                // if the Q is full, wait the write lock
                if(isFull())
                    fullCond.await();
    
                if(rare == -1){
                    rare = 0;
                    front = 0;
                } else if(rare == size - 1){
                    rare = 0;
                } else {
                    rare ++;
                }
    
                buffer[rare] = item;
                //System.out.println("Added\t: " + item);
    
                // notify the reader
                emptyCond.signal();
            } catch(InterruptedException e){
                e.printStackTrace();
            } finally {
                // unlock the write lock
                rwLock.unlock();
            }
    
        }
    
        public T dequeue(){
            T item = null;
            try{
                // get the read lock
                rwLock.lock();
                // if the Q is empty, wait the read lock
                if(isEmpty())
                    emptyCond.await();
    
                item = (T)buffer[front];
                //System.out.println("Deleted\t: " + item);
                if(front == rare){
                    front = rare = -1;
                } else if(front == size - 1){
                    front = 0;
                } else {
                    front ++;
                }
    
                // notify the writer
                fullCond.signal();
    
            } catch (InterruptedException e){
                e.printStackTrace();
            } finally{
                // unlock read lock
                rwLock.unlock();
            }
            return item;
        }
    }
    

    【讨论】:

      【解决方案5】:

      您可以使用 Java 很棒的 java.util.concurrent 包及其类。

      您可以使用以下方法轻松实现生产者消费者问题 BlockingQueueBlockingQueue 已经支持等待的操作 检索元素时队列变为非空,并等待 存储元素时队列中的空间可用。

      没有BlockingQueue,每次我们把数据放到生产者的队列中 一方面,我们需要检查队列是否已满,如果已满,请等待一些 时间,再次检查并继续。同样在消费者方面,我们 必须检查队列是否为空,如果为空,请等待一些 时间,再次检查并继续。但是对于BlockingQueue,我们不会 必须编写任何额外的逻辑,而不仅仅是从 Producer 添加数据和 来自消费者的轮询数据。

      阅读更多来自:

      http://javawithswaranga.blogspot.in/2012/05/solving-producer-consumer-problem-in.html

      http://www.javajee.com/producer-consumer-problem-in-java-using-blockingqueue

      【讨论】:

        【解决方案6】:

        使用 BlockingQueue,LinkedBlockingQueue 这真的很简单。 http://developer.android.com/reference/java/util/concurrent/BlockingQueue.html

        【讨论】:

          【解决方案7】:
          package javaapplication;
          
          import java.util.Stack;
          import java.util.logging.Level;
          import java.util.logging.Logger;
          
          public class ProducerConsumer {
          
              public static Object lock = new Object();
              public static Stack stack = new Stack();
          
              public static void main(String[] args) {
                  Thread producer = new Thread(new Runnable() {
                      int i = 0;
          
                      @Override
                      public void run() {
                          do {
                              synchronized (lock) {
          
                                  while (stack.size() >= 5) {
                                      try {
                                          lock.wait();
                                      } catch (InterruptedException e) {
                                      }
                                  }
                                  stack.push(++i);
                                  if (stack.size() >= 5) {
                                      System.out.println("Released lock by producer");
                                      lock.notify();
                                  }
                              }
                          } while (true);
          
                      }
          
                  });
          
                  Thread consumer = new Thread(new Runnable() {
                      @Override
                      public void run() {
                          do {
                              synchronized (lock) {
                                  while (stack.empty()) {
                                      try {
                                          lock.wait();
                                      } catch (InterruptedException ex) {
                                          Logger.getLogger(ProdCons1.class.getName()).log(Level.SEVERE, null, ex);
                                      }
                                  }
          
                                  while(!stack.isEmpty()){
                                      System.out.println("stack : " + stack.pop());
                                  }
          
                                  lock.notifyAll();
                              }
                          } while (true);
                      }
                  });
          
                  producer.start();
          
                  consumer.start();
          
              }
          
          }
          

          【讨论】:

            【解决方案8】:

            看看这个代码示例:

            import java.util.concurrent.*;
            import java.util.Random;
            
            public class ProducerConsumerMulti {
                public static void main(String args[]){
                    BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
            
                    Thread prodThread  = new Thread(new Producer(sharedQueue,1));
                    Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
                    Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
            
                    prodThread.start();
                    consThread1.start();
                    consThread2.start();
                } 
            }
            class Producer implements Runnable {
                private final BlockingQueue<Integer> sharedQueue;
                private int threadNo;
                private Random rng;
                public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
                    this.threadNo = threadNo;
                    this.sharedQueue = sharedQueue;
                    this.rng = new Random();
                }
                @Override
                public void run() {
                    while(true){
                        try {
                            int number = rng.nextInt(100);
                            System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                            sharedQueue.put(number);
                            Thread.sleep(100);
                        } catch (Exception err) {
                            err.printStackTrace();
                        }
                    }
                }
            }
            
            class Consumer implements Runnable{
                private final BlockingQueue<Integer> sharedQueue;
                private int threadNo;
                public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
                    this.sharedQueue = sharedQueue;
                    this.threadNo = threadNo;
                }
            
                @Override
                public void run() {
                    while(true){
                        try {
                            int num = sharedQueue.take();
                            System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
                            Thread.sleep(100);
                        } catch (Exception err) {
                           err.printStackTrace();
                        }
                    }
                }   
            }
            

            注意事项:

            1. 根据您的问题陈述开始了一个 Producer 和两个 Consumers
            2. Producer 会在无限循环中产生 0 到 100 之间的随机数
            3. Consumer 将无限循环消耗这些数字
            4. ProducerConsumer 都共享无锁和线程安全LinkedBlockingQueue,这是线程安全的。如果您使用这些高级并发构造,则可以删除 wait() 和 notify() 方法。

            【讨论】:

              【解决方案9】:

              您似乎跳过了有关wait()notify()synchronized 的内容。 请参阅this example,它应该对您有所帮助。

              【讨论】:

                猜你喜欢
                • 1970-01-01
                • 2013-11-17
                • 2014-10-13
                • 1970-01-01
                • 2012-04-30
                • 1970-01-01
                • 1970-01-01
                • 2017-02-01
                • 1970-01-01
                相关资源
                最近更新 更多