【问题标题】:Java - BlockingQueue freezes multithread applicationJava - BlockingQueue 冻结多线程应用程序
【发布时间】:2020-04-07 20:16:09
【问题描述】:

我正在制作一个包含两个线程的应用程序:其中一个将值写入 LinkedBlockingQueue,另一个正在读取。我正在使用 ScheduledExecutorService 在几秒钟内运行此操作。 问题是我的应用程序冻结了 BlockingQueue 的方法,我不明白为什么。

这是一个常见的资源:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue<String> q = new LinkedBlockingQueue<>();
}

这是读者

Semaphore semaphore = new Semaphore(1); /this is for reader does not take two places in thread pool
Runnable reader = ()->{
    try {
        semaphore.acquire();
        System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
        semaphore.release();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

作者:

Runnable writer = ()->{
    res.q.add("hi");
};

完整代码:

class Res{
    AtomicInteger atomicInteger = new AtomicInteger(0);
    BlockingQueue<String> q = new LinkedBlockingQueue<>();
}
public class Main {

    public static void main(String[] args) throws InterruptedException {

        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
        Res res = new Res();
        Semaphore semaphore = new Semaphore(1); //this is for reader does not take two places in thread pool
        Runnable reader = ()->{
            try {
                semaphore.acquire();
                System.out.println(res.q.take()+" "+res.atomicInteger.incrementAndGet());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        Runnable writer = ()->{
            res.q.add("hi");
        };
        Random rnd = new Random();

        for (int i = 0; i < 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(writer,time, TimeUnit.SECONDS);
        }
        for (int i = 0; i < 20; i++) {
            int time = rnd.nextInt(5)+ 2;
            executorService.schedule(reader,time, TimeUnit.SECONDS);
        }

        executorService.shutdown();
}

它应该打印二十行“hi [number]”,但在某些行上冻结。 比如我现在的打印:

hi 1
hi 2
hi 3
hi 4
hi 5

我发现如果我增加线程数newScheduledThreadPool(20) 它会开始工作,但是我怎样才能用两个线程来实现呢?谢谢!

【问题讨论】:

  • 您的执行程序从两个线程开始。当这两个线程在调用您的reader 对象时被阻塞并且队列为空时,您认为会发生什么?

标签: java java-8 concurrency blockingqueue linkedblockingqueue


【解决方案1】:

遵循您的代码有点困难,但同时发生的事情也很明显。由于Executors.newScheduledThreadPool(2);,您一次最多可以运行两个线程。这两个线程都是reader 线程。

所以Thread-1 进入try 块并通过semaphore.acquire(); 获得信号量许可,但队列是空的 - 因此它在res.q.take() 上阻塞。下一个线程 - Thread-2 也是一个读取线程,但它无法获取 permit,因为它已经被 Thread-1 占用并在 semaphore.acquire(); 上被阻塞。由于您没有其他线程的空间(您的池被阻止与这两个线程一起工作),因此没有编写器会将某些内容放入您的队列中并因此解除阻塞Thread-1(以便res.q.take() 可以工作)。

添加更多工作线程只会延迟问题 - 您最终可能会处于与以前相同的位置。

【讨论】:

  • 谢谢!这似乎是真的。我想我可以使用两个单独的执行器服务,例如ScheduledExecutorService executorServiceWriter = Executors.newScheduledThreadPool(1); ScheduledExecutorService executorServiceReader = Executors.newScheduledThreadPool(1); 这是一个好的解决方案吗?你觉得怎么样?
  • @AlexanderTukanov 是的,如果你有单独的读者和作者池,你应该没问题
  • 我是否理解正确,在我的情况下没有其他解决方案?
  • @AlexanderTukanov 可能有多种解决方案,但我不完全了解您要做什么 - 如果您只是想读取和写入队列并且您不希望线程阻塞,您的线程需要以某种方式协调它们的操作,并且有多种方法可以做到这一点。你可能想问一个单独的、明确的问题。
  • 是的,我只想在不阻塞的情况下读写队列。你的意思是我应该创建新问题?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2015-05-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-05-30
  • 1970-01-01
相关资源
最近更新 更多