【问题标题】:Java "Tiered Queue" implementation for fast Producers, slow Consumers快速生产者、慢消费者的 Java“分层队列”实现
【发布时间】:2025-12-10 08:00:02
【问题描述】:

我有一个生产者-消费者场景,其中生产者的生产速度远快于消费者的消费速度。通常,解决方案是让生产者阻塞,因为生产者/消费者场景的运行速度与最慢的组件一样快。限制或阻止生产者不是一个好的解决方案,因为我们的应用程序提供了足够的时间让消费者稍后赶上。

下图描绘了我们的应用程序中的完整“阶段”与更常见的场景:

      Our Application                 Common Scenario
 2N +--------+--------+
    |PPPPPPPP|oooooooo|                                         P = Producer
    |PPPPPPPP|oooooooo|                                         C = Consumer
  N +--------+--------+      N +--------+--------+--------+     o = Other Work
    |CPCPCPCP|CCCCCCCC|        |CPCPCPCP|CPCPCPCP|oooooooo|     N = number of tasks
    |CPCPCPCP|CCCCCCCC|        |CPCPCPCP|CPCPCPCP|oooooooo|
    -------------------        ----------------------------
    0       T/2       T        0       T/2       T      3T/2

这个想法是通过不抑制生产者来最大化吞吐量。

我们的任务操作的数据很容易序列化,因此我计划实施一个文件系统解决方案,用于溢出所有无法立即满足的任务。

我将 Java 的 ThreadPoolExecutorBlockingQueue 一起使用,以确保我们不会耗尽内存。问题在于实现这样一个“分层”队列,其中可以在内存中排队的任务会立即完成,否则数据会在磁盘上排队。

我提出了两种可能的解决方案:

  1. 使用LinkedBlockingQueueArrayBlockingQueue 实现作为参考,从头开始实现BlockingQueue。这可能就像复制标准库中的实现并添加文件系统读/写一样简单。
  2. 继续使用标准的BlockingQueue 实现,实现一个单独的FilesystemQueue 来存储我的数据,并使用一个或多个线程使文件出队,创建Runnables 并使用ThreadPoolExecutor 将它们入队。

这两种方法都合理吗?是否有更好的方法?

【问题讨论】:

    标签: java multithreading concurrency queue threadpool


    【解决方案1】:

    这听起来像是使用 JMS 队列而不是文件系统的理想情况。

    不要使用阻塞队列,而是将消息发布到持久 JMS 队列上。您仍然可以尝试分层方法,将 JMS 队列与BlockingQueue 并行组合,在 BlockingQueue 已满时发布到 JMS 队列,但我确信纯 JMS 方法本身可以正常工作。

    【讨论】:

      【解决方案2】:

      在寻求更复杂的解决方案之前,您真的确信使用有界BlockingQueue 会破坏您的交易吗?事实证明,增加堆大小并预先分配足够大的容量对您来说仍然是可以的。它将允许您避免复杂性和性能不确定性,而 GC 暂停的代价完全在您的舒适范围内。

      不过,如果您的工作负载不平衡,它可以利用持久化大量无法放入内存的消息(与经过验证的 MPMC 阻塞队列相比),听起来您需要一个更简单、更小的 @987654321 版本@ 或其Apollo 分支。根据您的应用程序,您可能会发现 ActiveMQ 的其他功能很有用,在这种情况下您可以直接使用它。如果没有,您最好按照 bowmore 的建议搜索 JMS 空间。

      【讨论】:

        【解决方案3】:

        第一个选项增加可用的堆空间大小,如建议的 Dimitar Dimitrov,使用内存标志-Xmx,例如java -Xmx2048m

        来自Oracle's Documentation:请注意,JVM 使用的内存不仅仅是堆。例如 Java 方法、线程堆栈和本机句柄在内存中分配 与堆分离,以及JVM内部数据结构。

        这也是java 堆内存是如何分类的图表。


        第二个选项是使用实现所请求功能的库。为此,您可以使用ashes-queue

        来自项目概述:这是一个简单的 Java FIFO 实现 有持久的支持。也就是说,如果队列已满,则 溢出的消息将被保留,当有可用的时候 插槽,它们将被放回内存中。


        第三个选项创建您自己的实现。就此而言,您可以预览this thread,它会引导您实现这一目标。

        您的建议包含在最后第三个选项中。两者都是合理的。从实现的角度来看,您应该选择第一个选项,因为它可以保证更简单的实现和简洁的设计。

        【讨论】: