【问题标题】:Java Concurrency - Implementing Monitors using Semaphores - All threads getting stuck in waiting state, something wrong with my understandingJava并发-使用信号量实现监视器-所有线程都陷入等待状态,我的理解有问题
【发布时间】:2014-06-21 02:39:49
【问题描述】:

我正在尝试使用 Java 中的信号量来实现监视器,以创建一个带有一些写入线程和一些读取线程的有界缓冲区。

到目前为止,我做了以下工作:

对于我们可能想要锁定的每个类(即之前,我们会在其中有一个同步代码块),我添加了两个信号量,一个二进制信号量在块的开头锁定并在结尾解锁(确保在任何时候只能执行一个关键代码部分),另一个充当传递 notify 和 notifyAll 信号的单元。我还创建了一个整数计数器来跟踪调用了等待的线程。

然后在同步代码块的开头,我获得了“监视器”的锁,然后在必要时调用我的替换等待指令。

我有两个线程不断调用 put 和两个线程不断调用 get。在 1 秒到 10 秒之间的任何时间后,所有线程都会卡住。

不知何故,他们都被卡住了,我真的不知道怎么回事!我花了几天时间思考这个问题。有什么想法吗?

有没有人知道是什么导致所有这些线程卡在这一点上?

谢谢,

【问题讨论】:

  • 我不知道这是否相关,但是,它让我想到了这个:Dining philosophers problem
  • blocksWaitingCount 在我看来应该是 volatile 或 AtomicInteger:否则无法保证其他线程会看到更新的值。

标签: java multithreading concurrency semaphore monitor


【解决方案1】:

我认为您的 notifyAll() 实现存在缺陷。要正确执行此操作,您需要一个联锁的比较和交换操作(就像 AtomicInteger 等人提供的那样)。问题是在你的循环中:

    // Equivalent of notifyAll()
    for (int i = val(); i>0; i--) {
        dec();
        notifyCalled.release();
    }

两个线程可以竞速并且都观察到val() 为1,然后dec() 调用都会成功并且blocksWaitingCount 将是-1。然后,因为blocksWaitingCount 不再匹配等待notifyCalled 许可的线程数,未来对notifyAll() 的调用将无法通知所有阻塞线程(因为即使i == 0,仍然存在线程阻塞)。重复几次迭代,notifyAll() 最终将完全停止释放任何线程,所有线程都将被阻塞。

【讨论】:

  • 嗯,好点子。但是在您的wait() 实现中,inc() 不受monitorSemaphore 的保护,因此您仍然冒着blocksWaitingCount 在对val()dec() 的调用之间切换的风险,对吧?
  • blocksWaitingCount 的原因是因为您试图确保每个notifyCalled.acquire() 都与一个notifyCalled.release() 配对,对吧?但是因为wait() 实现在执行inc(); notifyCalled.acquire() 时不持有monitorSemaphore,所以在相应的acquire() 发生之前,notifyAll() 实现可能会执行其release()
【解决方案2】:

问题是你无法控制在 notifyCalled.acquire() 中哪个线程会成功。

例如考虑这种情况:

  1. 线程 A 在 get 中等待

  2. 线程 B 执行两次 put 并填充缓冲区。只有一个线程在等待,所以它调用了 1 次 notifyCalled.release()

  3. 线程 C 执行 put,由于缓冲区已满,它进入等待块。

  4. 3 中的 notifyCalled.release() 导致 notifyCalled.aquire() 在线程 C 而不是线程 A 中成功

由于缓冲区仍然是满的,线程 C(和所有其他 put 操作)进入一个重新进入 while 循环并再次等待,线程 B 将永远不会收到它正在等待的释放。

解决方案

当由于缓冲区已满而未使用在 wait 块中获得的许可时,就会出现问题,因为缓冲区已满(或相反的 get 操作)。为了避免这种情况,可以使用一个标志来释放许可,以便另一个执行相反操作的等待线程可以尝试获取它。

此外,如 Daniel Pryden 所述,inc() 还应移至 monitorSemaphore 锁内以避免竞争条件。

请注意,这使得用于修改 blocksWaitingCount 的同步块变得不必要(尽管由于它们是无竞争的,它们对性能的影响很小)。

这是修改后的代码

public class BufferNonSync {

    private int[] buffer = new int[] { 0};
    private int start = 0;
    private int last = 0;
    private final int size = 1;
    private int numberInBuffer = 0;

    // Monitor variables
    private Semaphore monitorSemaphore = new Semaphore(1);
    private Semaphore notifyCalled = new Semaphore(0);

    private int blocksWaitingCount = 0;

    public void put(int input, int id) throws InterruptedException {
        monitorSemaphore.acquire();

        boolean acquired = false;
        while (numberInBuffer == size) {
            // Equivalent of wait()
            if (acquired) {
                dec();
                notifyCalled.release();
            }
            inc();
            monitorSemaphore.release();
            notifyCalled.acquire();
            monitorSemaphore.acquire();
            acquired = true;
        }

        // Critical section
        buffer[last] = input;
        last = (last + 1) % size;
        numberInBuffer++;

        // Equivalent of notifyAll()
        for (int i = val(); i > 0; i--) {
            dec();
            notifyCalled.release();
        }

        monitorSemaphore.release();
    }

    public int get(int id) throws InterruptedException {
        monitorSemaphore.acquire();

        boolean acquired = false;
        while (numberInBuffer == 0) {
            // Equivalent of wait()
            if (acquired) {
                dec();
                notifyCalled.release();
            }
            inc();
            monitorSemaphore.release();
            notifyCalled.acquire();
            monitorSemaphore.acquire();
            acquired = true;
        }

        // Critical section
        int temp = buffer[start];
        start = (start + 1) % size;
        numberInBuffer--;

        // Equivalent of notifyAll()
        for (int i = val(); i > 0; i--) {
            dec();
            notifyCalled.release();
        }

        monitorSemaphore.release(); 

        return temp;
    }

    private void inc() {
        blocksWaitingCount++;
    }

    private void dec() {
        blocksWaitingCount--;
    }

    private int val() {
        return blocksWaitingCount;
    }
}

【讨论】:

  • put线程只需要get线程通知即可,反之亦然。也许你可以将你的 notifyCalled 分成两个单独的对象
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-12-12
  • 1970-01-01
  • 2015-08-30
  • 1970-01-01
相关资源
最近更新 更多