【问题标题】:Multiple Producer Multiple Consumer Multithreading Java多生产者多消费者多线程Java
【发布时间】:2014-10-13 04:11:32
【问题描述】:

我正在尝试多个生产者 - 生产者-消费者问题的多个消费者用例。 我正在使用 BlockingQueue 在多个生产者/消费者之间共享公共队列。

下面是我的代码。
制片人

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {

    private BlockingQueue inputQueue;
    private static volatile int i = 0;
    private volatile boolean isRunning = true;

    public Producer(BlockingQueue q){
        this.inputQueue=q;
    }

    public synchronized void run() {

        //produce messages
        for(i=0; i<10; i++) 
        {
            try {
                inputQueue.put(new Integer(i));

                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Produced "+i);
        }
        finish();
    }

    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
    }

}

消费者

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {

    private BlockingQueue inputQueue;
    private volatile boolean isRunning = true;

    private final Integer POISON_PILL = new Integer(-1);

    Consumer(BlockingQueue queue) {
        this.inputQueue = queue;
    }

    public void run() {
        //worker loop keeps taking en element from the queue as long as the producer is still running or as 
        //long as the queue is not empty:
        while(!inputQueue.isEmpty()) {

            try {
                Integer queueElement = (Integer) inputQueue.take();
                System.out.println("Consumed : " + queueElement.toString());

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("Queue ");
    }

    //this is used to signal from the main thread that he producer has finished adding stuff to the queue
    public void finish() {
        //you can also clear here if you wanted
        isRunning = false;
        inputQueue.add(POISON_PILL);
    }
}

测试类

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;


public class ProducerConsumerService {

    public static void main(String[] args) {

        //Creating BlockingQueue of size 10
        BlockingQueue queue = new ArrayBlockingQueue(10);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting producer to produce messages in queue
        new Thread(producer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        //starting consumer to consume messages from queue
        new Thread(consumer).start();

        System.out.println("Producer and Consumer has been started");
    }

}

当我运行以下代码时,我没有看到正确的输出。

我在这里做错了吗?

【问题讨论】:

    标签: java multithreading producer-consumer blockingqueue


    【解决方案1】:

    具有多个生产者和多个消费者的示例代码。

    import java.util.concurrent.*;
    
    public class ProducerConsumerDemo {
    
        public static void main(String args[]){
    
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();
    
         Thread prodThread1 = new Thread(new Producer(sharedQueue,1));
         Thread prodThread2 = new Thread(new Producer(sharedQueue,2));
         Thread consThread1 = new Thread(new Consumer(sharedQueue,1));
         Thread consThread2 = new Thread(new Consumer(sharedQueue,2));
    
         prodThread1.start();
         prodThread2.start();
         consThread1.start();
         consThread2.start();
        }
    
    }
    
    class Producer implements Runnable {
    
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
    
        public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.threadNo = threadNo;
            this.sharedQueue = sharedQueue;
        }
    
        @Override
        public void run() {
            for(int i=1; i<= 5; i++){
                try {
                    int number = i+(10*threadNo);
                    System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                    sharedQueue.put(number);
                } catch (Exception err) {
                    err.printStackTrace();
                }
            }
        }
    
    }
    
    class Consumer implements Runnable{
    
        private final BlockingQueue<Integer> sharedQueue;
        private int threadNo;
        public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
            this.sharedQueue = sharedQueue;
            this.threadNo = threadNo;
        }
    
        @Override
        public void run() {
            while(true){
                try {
                    int num = sharedQueue.take();
                    System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
                } catch (Exception err) {
                   err.printStackTrace();
                }
            }
        }   
    }
    

    输出:

    Produced:11:by thread:1
    Produced:21:by thread:2
    Produced:22:by thread:2
    Produced:23:by thread:2
    Produced:24:by thread:2
    Produced:25:by thread:2
    Consumed: 11:by thread:1
    Consumed: 22:by thread:1
    Consumed: 23:by thread:1
    Consumed: 24:by thread:1
    Consumed: 25:by thread:1
    Produced:12:by thread:1
    Consumed: 21:by thread:2
    Consumed: 12:by thread:1
    Produced:13:by thread:1
    Produced:14:by thread:1
    Produced:15:by thread:1
    Consumed: 13:by thread:2
    Consumed: 14:by thread:1
    Consumed: 15:by thread:2
    

    This article 提供了 BlockingQueue 的简单示例生产者和消费者问题

    对代码的更改:

    1. 不同的生产者会产生不同的输出而不是相同的输出。 生产者线程 1 生成 11-15 和生产者线程 2 的数字 生成 21-25 的数字
    2. 任何消费者线程都可以使用来自任何生产者的数据。与生产者不同,消费者没有使用数据的限制。
    3. 我在生产者和消费者中都添加了线程号。

    您可以通过ExecutorService 找到替代解决方案:

    Producer/Consumer threads using a Queue

    【讨论】:

      【解决方案2】:

      直接实现它并不太难。下面的示例代码就是这样做的。它只是对不应该共享的所有内容使用局部变量。

      除了队列之外,只有一个维护活跃生产者数量的线程安全计数器是共享的。使用计数器而不是特殊的“POISON_PILL”值,因为这样的标记值不适用于单个队列和多个消费者,因为所有消费者必须识别生产者的完成,但只有在所有制作人都已经完成了。

      计数器是一个简单的结束条件。唯一需要关心的是,在检测到计数器为零后,必须重新检查队列以避免竞争条件。

      附带说明,使用 Java 5 提供的并发特性而不使用泛型来编写干净的类型安全代码是没有意义的。

      final AtomicInteger activeProducers=new AtomicInteger();
      final BlockingQueue<Integer> queue=new ArrayBlockingQueue<>(10);
      Runnable producer=new Runnable() {
        public void run() {
          try {
            for(int i=0; i<10; i++) {
                Thread.sleep(TimeUnit.SECONDS.toMillis(1));
                queue.put(i);
                System.out.println("Produced "+i);
            }
          } catch(InterruptedException ex) {
            System.err.println("producer terminates early: "+ex);
          }
          finally { activeProducers.decrementAndGet(); }
        }
      };
      Runnable consumer=new Runnable() {
        public void run() {
          try {
            for(;;) {
              Integer queueElement = queue.poll(1, TimeUnit.SECONDS);
              if(queueElement!=null)
                System.out.println("Consumed : " + queueElement);
              else if(activeProducers.get()==0 && queue.peek()==null) return;
            }
          } catch(InterruptedException ex) {
            System.err.println("consumer terminates early: "+ex);
          }
        }
      };
      final int NUM_PRODUCERS = 2, NUM_CONSUMERS = 2;
      for(int i=0; i<NUM_PRODUCERS; i++) {
        activeProducers.incrementAndGet();
        new Thread(producer).start();
      }
      for(int i=0; i<NUM_CONSUMERS; i++) {
        new Thread(consumer).start();
      }
      

      【讨论】:

        【解决方案3】:

        您的很多代码没有意义。我建议你坐下来弄清楚代码为什么存在以及它在做什么。

        如果您删除了isFinshed 标志,则不会有任何变化。

        如果您在生产者中删除了synchronized 的使用,您将拥有并发生产者。将仅在同步块中访问的字段设置为 volatile 没有任何好处。

        如果生产者要并发,那么共享循环计数器是没有意义的。 通常,生产者发送毒丸,而消费者不消费毒丸。例如如果你有两个消费者,一个可能会添加药丸,另一个可能会消耗它。您的消费者忽略了毒丸,因为它忽略了isFinished 标志。

        您不想仅仅因为队列暂时为空而停止消费者。否则它将看不到生产者产生的所有消息,可能一个也看不到。

        【讨论】:

        • 感谢您的回复。我一直在寻找一些多生产者-多消费者的示例代码 sn-ps,但是我在网上找到的那些不能正常运行。你知道有什么好的网站可以很好地解释这个问题吗?
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-02-01
        • 1970-01-01
        • 2013-11-17
        • 1970-01-01
        • 1970-01-01
        • 2013-04-12
        • 1970-01-01
        相关资源
        最近更新 更多