【问题标题】:Threads and concurrency线程和并发
【发布时间】:2012-11-12 07:56:34
【问题描述】:

有谁知道是否有可靠的方法来设置生产者消费者类型队列,其中包含以下内容:

1) Producer 最初将三个对象放入队列

2) 消费者消费一个项目,在队列中留下两个对象

3) 控制权传回 Producer,Producer 再放一个项目 - (现在又是 3 个对象在队列中)

4) 消费者又消费了一件商品,因此循环继续

请注意,我需要手动设置解决方案,而无需使用任何界面来完成我正在处理的任务。任何建议将不胜感激。

线程可以进行这种微调吗?

【问题讨论】:

  • 在某种程度上,你在谈论信号量。
  • Java ..抱歉,我应该提到这一点。
  • 如果我理解正确:您希望Consumer 始终等到队列已满?
  • 参见 ArrayBlockingQueue。查看源代码,您可以对其进行修改以满足您的确切需求。
  • @bosra 好吧,这是线程,没有好的做法。你所描述的听起来是可行的。只需确保在检查队列是否已满和产生消费者之间没有竞争条件。在您知道只有生成的消费者将访问队列的人为情况下,这应该是安全的。 (这种情况听起来不太实际,因为要优雅地关闭多线程系统,您通常希望清空此类队列。)

标签: java multithreading thread-safety


【解决方案1】:

Java 7 有一个 LinkedTransferQueue,这听起来像您正在寻找的,或者查看父类型 BlockingQueue,我相信其中一个会符合要求。

【讨论】:

    【解决方案2】:

       您可以使用 ArrayList 并将容量设置为 3;然后,只要 Producer 执行,就会检查数组大小;如果大小小于 3,只需将值插入 ArrayList 直到大小为 3;如果大小为 3;只需调用 notifyAll() 方法即可完成。
    同样对于Consumer,如果大小为3,则消费其中一个值,并将其从ArrayList中移除;如果大小小于 3,只需调用 notifyAll() 方法即可完成。
    简而言之,这基本上就是它的工作原理;实施将取决于您计划用它实现的目标。

    希望这会有所帮助。

    【讨论】:

    • 值得补充的是,ArrayLists 不是线程安全的,所有这些操作都需要某种形式的同步。
    • 我明白;我的假设是,他已经涵盖了线程的细节。 ArrayBlockingQueue 是他可以使用的类的一个示例,如果他不想使用 synchronized 关键字。
    【解决方案3】:

    我可能会做的是有一个相对普通的队列并且还有一个信号量。将信号量初始化为 -2。让生产者在将请求排入队列时增加信号量。让消费者在出队请求之前减少信号量。在计数变为 1 之前,消费者将无法进行递减,因此队列中总会有 2 个未服务的请求。

    【讨论】:

      【解决方案4】:
      I hope this solution gives you some help:
      

      解释如下:

      两个独立的线程(Producer thread and Consumer thread) 在一个公共队列上相互协调工作(这里我提到了数组)。生产者从一个数据数组中放入三个元素,消费者获取一个并从同一个数组中删除。当数据数组所有元素都放入队列时,消费者只一个一个地获取。 put()take() 是在单独的类 Drop 中定义的同步方法。

      import java.util.ArrayList;
      import java.util.List;
      import java.util.concurrent.locks.Condition;
      import java.util.concurrent.locks.Lock;
      import java.util.concurrent.locks.ReentrantLock;
      
      public class ProducerConsumerExample {
          public static final Lock fileLock = new ReentrantLock();
          public static final Condition condition = fileLock.newCondition();
          public static String importantInfo[] = {
                  "Mares eat oats",
                  "Does eat oats",
                  "Little lambs eat ivy",
                  "A kid will eat ivy too",
                  "abc",
                  "def",
                  "ghi",
                  "jkl",
                  "mno",
                  "pqr"
          };
          public static List<String> list = new ArrayList<String>();
          public static boolean done = false;
          public static void main(String[] args) {
              Drop drop = new Drop();
              Thread tProducer = new Thread(new Producer(drop));
              Thread tConsumer = new Thread(new Consumer(drop));
              try{
              tProducer.start();
              tConsumer.start();
              }
              catch(Exception ie){}
          }
      }
      
      public class Consumer implements Runnable {
          private Drop drop;
      
          public Consumer(Drop drop) {
              this.drop = drop;
          }
      
          public void run() {
              try{
                  ProducerConsumerExample.fileLock.lock();
                  for (String message = drop.take();
                          ! message.equals("DONE");
                          message = drop.take()) {
                      System.out.format("MESSAGE RECEIVED: %s%n", message);
                      ProducerConsumerExample.list.remove(0);
                      if(ProducerConsumerExample.done)
                          continue;
                      else{
                          ProducerConsumerExample.condition.signal();
                          System.out.println("Consumer is waiting");
                          ProducerConsumerExample.condition.await();
                      }
                   } catch (InterruptedException e) {}
                  }
              }
              catch(Exception e){
      
              }
              finally{
                  ProducerConsumerExample.fileLock.unlock();
              }
          }
      }
      import java.util.Random;
      import java.util.concurrent.locks.Condition;
      import java.util.concurrent.locks.Lock;
      import java.util.concurrent.locks.ReentrantLock;
      
      public class Producer implements Runnable {
          private Drop drop;
          public Producer(Drop drop) {
              this.drop = drop;
          }
      
          public void run() {
              try{
                  ProducerConsumerExample.fileLock.lock();
                  Random random = new Random();int check = 3;
      
                  for (int i = 0;
                          i <ProducerConsumerExample.importantInfo.length;
                          i++) {
                      if(i<check){
                      System.out.println("Putting message");
                      System.out.println(ProducerConsumerExample.importantInfo[i]);
                      drop.put(ProducerConsumerExample.importantInfo[i]);
                      }
                      else{
                          check = check+3;
                          i--;
                          ProducerConsumerExample.condition.signal();
                          System.out.println("Producer is waiting");
                          ProducerConsumerExample.condition.await();
                      }
                  }
                  drop.put("DONE");
                  ProducerConsumerExample.done =true;
                  ProducerConsumerExample.condition.signal();
                  System.out.println("Producer is waiting");
                  ProducerConsumerExample.condition.await();
              }
              catch(Exception e){
                  e.printStackTrace();
              }
              finally{
                  ProducerConsumerExample.fileLock.unlock();
              }
          }
      }
      import java.util.ArrayList;
      import java.util.List;
      
      public class Drop {
          // Message sent from producer
          // to consumer.
          private String message;
          public synchronized String take() {
              System.out.println(ProducerConsumerExample.list.size());
              return ProducerConsumerExample.list.get(0);
          }
      
          public synchronized void put(String message) {
              // Store message.
              ProducerConsumerExample.list.add(message);
          }
      }
      

      【讨论】:

      • 您可以添加有关此代码如何工作的描述,以帮助其他人更好地理解。
      • 您能否提供一些使用 java Executors 和 Blocking Queue 实现相同目标的示例
      猜你喜欢
      • 2011-01-01
      • 2015-02-12
      • 2014-03-02
      • 1970-01-01
      • 1970-01-01
      • 2020-08-26
      • 2014-06-30
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多