【问题标题】:How to solve the producer-consumer using semaphores?如何使用信号量解决生产者-消费者问题?
【发布时间】:2011-11-27 19:47:37
【问题描述】:

我需要编写一个类似于生产者-消费者的问题,它必须使用信号量。我尝试了几种解决方案,但都没有奏效。首先,我在 Wikipedia 上尝试了一个解决方案,但没有奏效。我当前的代码是这样的:

消费者的方法运行:

    public void run() {
    int i=0;
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    String s = new String();
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        try {
            Thread.sleep(1000);///10000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.encheBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i] == null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            QuantidadeBuffer.quantidade--;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            int identificador;
            identificador=buffer[i].getIdentificador()[0];
            s="Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i;
            //System.out.println("Consumidor Thread: "+Thread.currentThread()+" Pedido: "+identificador+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            buffer[i]= null;
        }
        // RC
        this.mutex.up();
        //this.esvaziaBuffer.up();
        System.out.println(s);
  //            lock.up();
    }
}

生产者的方法运行:

    public void run() {
    DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
    int i=0;
    while (1!=2){
        Date datainicio = new Date();
        String inicio=dateFormat.format(datainicio);
        // Produz Item
        try {
            Thread.sleep(500);//50000
        } catch (InterruptedException e) {
            System.out.println("Excecao InterruptedException lancada.");
        }
        //this.esvaziaBuffer.down();
        this.mutex.down();
        // RC
        i=0;
        while (i<buffer.length) {
            if (buffer[i]!=null) {
                i++;
            } else {
                break;
            }
        }
        if (i<buffer.length) {
            int identificador[]=new int[Pedido.getTamanho_identificador()];
            identificador[0]=i;
            buffer[i]=new Pedido();
            Produtor.buffer[i].setIdentificador(identificador);
            Produtor.buffer[i].setTexto("pacote de dados");
            QuantidadeBuffer.quantidade++;
            Date datafim = new Date();
            String fim=dateFormat.format(datafim);
            System.out.println("Produtor Thread: "+Thread.currentThread()+" Pedido: "+identificador[0]+" Inicio: "+inicio+" Fim: "+fim+" posicao "+i);
            i++;
        }
        // RC
        this.mutex.up();
        //this.encheBuffer.up();
    }
    //this.encheBuffer.up();
}

在上面的代码中,消费者线程读取了一个位置,然后另一个线程读取了相同的位置,而没有生产者填充该位置,如下所示:

Consumidor Thread: Thread[Thread-17,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1
Consumidor Thread: Thread[Thread-19,5,main] Pedido: 1 Inicio: 2011/11/27 17:23:33 Fim: 2011/11/27 17:23:34 posicao 1

【问题讨论】:

  • 您是否尝试在 Java 中使用 waitnotify 方法实现此功能?它会让你的生活更轻松
  • 是的,我在信号量类中做到了。
  • @Nupul - 我发现信号量队列更容易理解 - 锁内没有奇怪的“等待”,(是的,我知道它会释放锁,但看起来很奇怪),没有反虚假唤醒循环.生产者推送然后发出信号,消费者等待然后弹出。根本不需要查看队列数。

标签: java multithreading concurrency concurrent-programming


【解决方案1】:

您使用的似乎是互斥体而不是信号量?

在使用互斥锁时,您只有二进制同步 - 锁定和解锁一个资源。信号量具有您可以发出信号或获取的价值。

您正在尝试锁定/解锁整个缓冲区,但这是错误的方法,因为正如您所见,生产者或消费者锁定,并且当阅读器锁定它时,生产者无法填充缓冲区(因为它必须先锁定)。

您应该改为创建一个信号量,然后当生产者写入一个数据包或数据块时,它可以向信号量发出信号。然后,消费者可以尝试获取信号量,因此他们将等待生产者发出已写入数据包的信号。在发出写入数据包的信号后,其中一个消费者将被唤醒,它会知道它可以读取一个数据包。它可以读取一个数据包,然后返回尝试在信号量上获取。如果在那段时间生产者写入了另一个数据包,它会再次发出信号,然后任何一个消费者将继续读取另一个数据包。等等……

例如:

(制作人) - 写一包 - Semaphore.release(1)

(消费者 xN) - Semaphore.acquire(1) - 读取一个数据包

如果您有多个消费者,那么消费者(不是生产者)应该在读取数据包时锁定缓冲区(但在获取信号量时)以防止出现竞争条件.在下面的示例中,生产者还锁定了列表,因为所有内容都在同一个 JVM 上。

import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Semaphores {

    static Object LOCK = new Object();

    static LinkedList list = new LinkedList();
    static Semaphore sem = new Semaphore(0);
    static Semaphore mutex = new Semaphore(1);

    static class Consumer extends Thread {
        String name;
        public Consumer(String name) {
            this.name = name;
        }
        public void run() {
            try {

                while (true) {
                    sem.acquire(1);
                    mutex.acquire();
                    System.out.println("Consumer \""+name+"\" read: "+list.removeFirst());
                    mutex.release();
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    static class Producer extends Thread {
        public void run() {
            try {

                int N = 0;

                while (true) {
                    mutex.acquire();
                    list.add(new Integer(N++));
                    mutex.release();
                    sem.release(1);
                    Thread.sleep(500);
                }
            } catch (Exception x) {
                x.printStackTrace();
            }
        }
    }

    public static void main(String [] args) {
        new Producer().start();
        new Consumer("Alice").start();
        new Consumer("Bob").start();
    }
}

【讨论】:

  • 当然,在我的答案中添加了示例代码。输出将是消费者将在收到数据包时打印出来(每 500 毫秒由生产者确定 - 请注意,消费者没有睡眠)。
  • 上面的例子使用了 LinkedList 来说明这个例子,但是你当然可以使用一个有限数组,然后使用你在代码中使用的 nulling 方法来代替它。
  • 使用synchronized 似乎有点作弊,虽然我能理解这个论点。但作为对@Victor 的提醒,您可以很容易地用额外的信号量替换同步语句。然后您可以考虑如何针对有限的数据结构执行此操作(即您只有 10 个存储空间用于生产者,并且无法添加第 11 个)。
  • 我已将其编辑为使用信号量而不是同步。请注意,用作互斥体的信号量必须初始化为 1,以确保最多 1 个线程可以持有锁(如互斥体)。
【解决方案2】:

多线程应用程序最常见的使用模式之一是创建异步通信网络。几个现实世界的应用程序需要这个。有两种方法可以实现这一点:-

  1. 生产者和消费者紧密耦合。这不是异步的,每个生产者都在等待消费者,反之亦然。应用程序的吞吐量也成为 2 个实体中的最小值。这通常不是一个好的设计。
  2. 更好(也更复杂)的方法是在生产者和消费者之间引入一个共享缓冲区。这样,更快的生产者或更快的消费者不会因为较慢的对应物而受到限制。它还允许多个生产者和多个消费者通过共享缓冲区进行连接。

有几种方法可以创建生产者-消费者模式。

  1. 使用前面的“锁定基础”模块中介绍的 wait/notify/nofityAll
  2. 使用 Java 提供的 API - java.util.concurrent.BlockingQueue。我们将在后续模块中对此进行详细介绍。
  3. 使用信号量:这是创建生产者-消费者模式的一种非常方便的方法。

    public class ProducerConsumerSemaphore {
    
    private static final int BUFFER_SIZE = 10;
    private static final int MAX_VALUE = 10000;
    private final Stack<Integer> buffer = new Stack<Integer>();
    private final Semaphore writePermits = new Semaphore(BUFFER_SIZE);
    private final Semaphore readPermits = new Semaphore(0);
    private final Random random = new Random();
    
    class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                writePermits.acquireUninterruptibly();
                buffer.push(random.nextInt(MAX_VALUE));
                readPermits.release();
            }
        }
    }
    
    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                readPermits.acquireUninterruptibly();
                System.out.println(buffer.pop());
                writePermits.release();
            }
        }
    }
    
    public static void main(String[] args) {
    
        ProducerConsumerSemaphore obj = new ProducerConsumerSemaphore();
        Producer p1 = obj.new Producer();
        Producer p2 = obj.new Producer();
        Producer p3 = obj.new Producer();
        Consumer c1 = obj.new Consumer();
        Consumer c2 = obj.new Consumer();
        Consumer c3 = obj.new Consumer();
        Thread t1 = new Thread(p1);
        Thread t2 = new Thread(p2);
        Thread t3 = new Thread(p3);
        Thread t4 = new Thread(c1);
        Thread t5 = new Thread(c2);
        Thread t6 = new Thread(c3);
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
        t6.start();
    }
    

我们使用 2 个信号量 - 1 个用于消费者,1 个用于生产者。

生产者允许的许可数量设置为最大缓冲区大小。

每个生产者消耗 1 个写许可,并在产生 1 条消息时释放 1 个读许可。

每个消费者消费 1 个读许可并释放 1 个写许可消费每条消息。

想象一下,将许可证存入实际消息中。写入许可流从生产者流向消费者(然后返回生产者)。读取许可从消费者流向生产者(并返回消费者)。在任何给定时间点缓冲区中的总消息将完全等于发出的读取许可数。如果产生消息的速率大于消费消息的速率,那么在某个时刻,可用的写许可数量将被耗尽,所有生产者线程将被阻塞,直到消费者从缓冲区读取并释放写许可。同样的逻辑反过来也存在。

以上是系统中消息流和许可的更直观的表述。 通过使用信号量,我们只是抽象出使用 wait/notify/notifyAll 编写一段代码所需的血腥细节和注意事项。 上面的代码可以和wait等比。方法:

当一个线程因缺少许可而被阻塞时,它相当于对该信号量的 wait() 调用。

当线程释放许可时,它相当于对该特定信号量的 notifyAll() 调用。

【讨论】:

    【解决方案3】:
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Semaphore;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /*
     * To change this license header, choose License Headers in Project Properties.
     * To change this template file, choose Tools | Templates
     * and open the template in the editor.
     */
    /**
     *
     * @author sakshi
     */
    public class SemaphoreDemo {
    
        static Semaphore producer = new Semaphore(1);
        static Semaphore consumer = new Semaphore(0);
        static List<Integer> list = new ArrayList<Integer>();
    
        static class Producer extends Thread {
    
            List<Integer> list;
    
            public Producer(List<Integer> list) {
                this.list = list;
            }
    
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        producer.acquire();
    
                    } catch (InterruptedException ex) {
                        Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                    }
                    System.out.println("produce=" + i);
    
                    list.add(i);
                    consumer.release();
    
                }
            }
        }
    
        static class Consumer extends Thread {
    
            List<Integer> list;
    
            public Consumer(List<Integer> list) {
                this.list = list;
            }
    
            public void run() {
                for (int i = 0; i < 10; i++) {
                    try {
                        consumer.acquire();
                    } catch (InterruptedException ex) {
                        Logger.getLogger(SemaphoreDemo.class.getName()).log(Level.SEVERE, null, ex);
                    }
    
                    System.out.println("consume=" + list.get(i));
    
                    producer.release();
                }
            }
        }
    
        public static void main(String[] args) {
            Producer produce = new Producer(list);
    
            Consumer consume = new Consumer(list);
    
            produce.start();
            consume.start();
        }
    }
    
    
    output:
    
    produce=0
    consume=0
    produce=1
    consume=1
    produce=2
    consume=2
    produce=3
    consume=3
    produce=4
    consume=4
    produce=5
    consume=5
    produce=6
    consume=6
    produce=7
    consume=7
    produce=8
    consume=8
    produce=9
    consume=9
    

    【讨论】:

      【解决方案4】:
      import java.util.concurrent.Semaphore;
      
      
      public class ConsumerProducer{
      
          public static void main(String[] args) {
      
                 Semaphore semaphoreProducer=new Semaphore(1);
                 Semaphore semaphoreConsumer=new Semaphore(0);
                 System.out.println("semaphoreProducer permit=1 | semaphoreConsumer permit=0");
      
                 new Producer(semaphoreProducer,semaphoreConsumer).start();
                 new Consumer(semaphoreConsumer,semaphoreProducer).start();
      
          }
      
      
      /**
       * Producer Class.
       */
      static class Producer extends Thread{
      
          Semaphore semaphoreProducer;
          Semaphore semaphoreConsumer;
      
      
          public Producer(Semaphore semaphoreProducer,Semaphore semaphoreConsumer) {
                 this.semaphoreProducer=semaphoreProducer;
                 this.semaphoreConsumer=semaphoreConsumer;
          }
      
          public void run() {
                 for(;;){
                        try {
                            semaphoreProducer.acquire();
                            System.out.println("Produced : "+Thread.currentThread().getName());
                            semaphoreConsumer.release();
      
                        } catch (InterruptedException e) {
                              e.printStackTrace();
                        }
                 }          
          }
      }
      
      /**
       * Consumer Class.
       */
      static class Consumer extends Thread{
      
          Semaphore semaphoreConsumer;
          Semaphore semaphoreProducer;
      
          public Consumer(Semaphore semaphoreConsumer,Semaphore semaphoreProducer) {
                 this.semaphoreConsumer=semaphoreConsumer;
                 this.semaphoreProducer=semaphoreProducer;
          }
      
          public void run() {
      
                 for(;;){
                        try {
                            semaphoreConsumer.acquire();
                            System.out.println("Consumed : "+Thread.currentThread().getName());
                            semaphoreProducer.release();
                        } catch (InterruptedException e) {
                              e.printStackTrace();
                        }
                 }
          }
      
      }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多