【问题标题】:java assignment synchronization producer consumerjava赋值同步生产者消费者
【发布时间】:2018-12-13 04:33:35
【问题描述】:

我为生产者消费者问题编写了一个多线程代码,其中我在消费者和生产者线程的 run 方法中编写了同步块,该线程锁定共享列表(我假设) 所以问题的关键是,列表上是否会有锁定,因为每个线程都有自己的同步块,但它们共享同一个列表实例

public class Main {
    static boolean finishFlag=false;
    final int queueSize = 20;
    List<Integer> queue = new LinkedList<>();
    Semaphore semaphoreForList = new Semaphore(queueSize);

    public Main(int producerCount,int consumerCount) {
        while(producerCount!=0) {
            new MyProducer(queue,semaphoreForList,queueSize).start(); //produces the producer
            producerCount--;
        }
        while(consumerCount!=0) {
            new MyConsumer(queue,semaphoreForList,queueSize).start(); //produces the consumer
            consumerCount--;
        }
    }

    public static void main(String args[]) {
        /*
         * input is from command line 1st i/p is number of producer and 2nd i/p is number of consumer
         */
        try {
            Main newMain = new Main(Integer.parseInt(args[0]),Integer.parseInt(args[1]));
            try {
                Thread.sleep(30000);
            }
            catch(InterruptedException e) {
            }
            System.out.println("exit");
            finishFlag=true;
        }
        catch(NumberFormatException e) {
            System.out.println(e.getMessage());
        }

    }
}

class MyProducer extends Thread{
    private List<Integer> queue;
    Semaphore semaphoreForList;
    int queueSize;
    public MyProducer(List<Integer> queue, Semaphore semaphoreForList,int queueSize) {
        this.queue = queue;
        this.semaphoreForList = semaphoreForList;
        this.queueSize = queueSize;
    }
    public void run() {
        while(!Main.finishFlag) {
            try {
                Thread.sleep((int)(Math.random()*1000));
            }
            catch(InterruptedException e) {
            }
            try {
                if(semaphoreForList.availablePermits()==0) {//check if any space is left on queue to put the int
                        System.out.println("no more spaces left");
                }
                else {
                    synchronized(queue) {
                        semaphoreForList.acquire(); //acquire resource by putting int on the queue
                        int rand=(int)(Math.random()*10+1);
                        queue.add(rand);
                        System.out.println(rand+" was put on queue and now length is "+(queueSize-semaphoreForList.availablePermits()));        
                    }
                }
            }
            catch(InterruptedException m) {
                System.out.println(m);
            }
        }
    }   
}

public class MyConsumer extends Thread{
    private List<Integer> queue; //shared queue by consumer and producer
    Semaphore semaphoreForList;
    int queueSize;
    public MyConsumer(List<Integer> queue, Semaphore semaphoreForList,int queueSize) {
        this.queue = queue;
        this.semaphoreForList = semaphoreForList;
        this.queueSize = queueSize;
    }
    public void run() {
        while(!Main.finishFlag) {//runs until finish flag is set to false by main
            try {
                Thread.sleep((int)(Math.random()*1000));//sleeps for random amount of time
            }
            catch(InterruptedException e) {
            }
            if((20-semaphoreForList.availablePermits())==0) {//checking if any int can be pulled from queue
                System.out.println("no int on queue");
            }
            else {
                synchronized(queue) {
                    int input=queue.remove(0);//releases the resource(position in queue) by pulling the int out of the queue and computing factorial 
                    semaphoreForList.release();
                    int copyOfInput=input;
                    int fact=1;
                    while(copyOfInput!=0) {
                        fact = fact*copyOfInput;
                        copyOfInput--;
                    }
                    System.out.println(input+" was pulled out from queue and the computed factorial is "+fact+
                            " the remaining length of queue is "+(queueSize-semaphoreForList.availablePermits()));
                }   
            }       
        }
    }
}

【问题讨论】:

  • 我会考虑将其重写为大小的一小部分,具体取决于您想要保留多少,以制作一个更简单的示例来解决您要说明的问题。
  • 简而言之,只要您同步的对象是共享的,它就可以工作。您可以删除信号量、睡眠和完成标志。您也不需要在持有锁时计算阶乘或打印结果。

标签: java multithreading concurrency producer-consumer


【解决方案1】:

我宁愿推荐使用java.lang.Object 方法wait()notify() 来创建消费者-生产者算法。使用这种方法,队列不会被无休止地重复和不必要的 synchronized 语句阻塞,我认为这是一种性能更高且“事件驱动”的解决方案。

此链接可能会有所帮助 - https://www.geeksforgeeks.org/producer-consumer-solution-using-threads-java/

【讨论】:

    【解决方案2】:

    是的,互斥锁/监视器与 Java Object 实例相关联,该实例是此实例中的共享列表。这意味着所有线程都锁定同一个互斥体(与queue 关联,并通过它进行同步。

    所以好的部分:你的程序实际上是线程安全的。

    然而,额外的信号量实际上在很多方面都没有多大意义:

    • 检查(例如,availablePermits)发生在锁之外,因此只是对队列状态的最佳猜测。不久之后可能会有所不同。
    • 试图在锁内获取信号量,而该信号量只能在同一个锁内释放,看起来像是死锁的保证方法。

    正如 AnDus 所提到的,这可能通过使用充当条件变量的等待和通知方法来更好地解决。很可能您甚至需要两个,一个用于解除对生产者的阻止,一个用于解除对消费者的阻止。

    一般来说,如果这不是编码练习,请使用已经实现所需功能的类。在这种情况下,java.util.concurrent.BlockingQueue 似乎是您想要的。

    【讨论】: