【问题标题】:Java: High-performance message-passing (single-producer/single-consumer)Java:高性能消息传递(单一生产者/单一消费者)
【发布时间】:2010-07-29 22:11:55
【问题描述】:

我最初问过这个问题here,但我意识到我的问题不是关于 while-true 循环。我想知道的是,在 Java 中进行高性能异步消息传递的正确方法是什么?

我要做什么......

我有大约 10,000 个消费者,每个消费者都从他们的私有队列中消费消息。我有一个线程一一生成消息并将它们放入正确的消费者队列中。每个消费者无限循环,检查消息是否出现在其队列中并处理它。

我相信这个术语是“单一生产者/单一消费者”,因为有一个生产者,每个消费者只在他们的私有队列上工作(多个消费者从不从同一个队列中读取)。

Consumer.java 内部:

@Override
public void run() {
    while (true) {
        Message msg = messageQueue.poll();
        if (msg != null) {
            ... // do something with the message
        }
    }
}

生产者正在快速将消息放入消费者消息队列中(每秒数百万条消息)。消费者应该尽快处理这些消息!

注意:while (true) { ... } 被 Producer 作为其最后一条消息发送的 KILL 消息终止。

但是,我的问题是关于设计此消息传递的正确方法。我应该为 messageQueue 使用哪种队列?它应该是同步的还是异步的? Message应该如何设计?我应该使用 while-true 循环吗?消费者应该是一个线程还是其他什么? 10,000 个线程会慢到爬行吗?线程的替代方法是什么?

那么,在 Java 中进行高性能消息传递的正确方法是什么?

【问题讨论】:

  • 为什么你有 10k 个线程?线程切换会有很多开销,除非您有非常多的内核或任务,每个线程都需要大量等待。
  • 10k线程离高性能还很远
  • @Mike: 有 10,000 个不同的符号,每个消费者处理一个符号的消息。我不知道它是否应该被实现为线程,但是消费者之间不共享任何东西,并且是演员模型的一个很好的候选者。
  • @Willi Schönborn: 我的问题是,如果不是线程,那是什么?我想在没有库/框架的情况下做到这一点;比如,什么是合适的设计?
  • 要知道什么是合适的设计,我们必须知道你在做什么。消息来自哪里?它们代表什么,等等。

标签: java asynchronous producer-consumer message-passing


【解决方案1】:

我会说 10,000 个线程的上下文切换开销会非常高,更不用说内存开销了。默认情况下,在 32 位平台上,每个线程使用 256kb 的默认堆栈大小,因此只有 2.5GB 用于您的堆栈。显然你说的是 64 位,但即便如此,那还是相当大的内存量。由于使用的内存量很大,缓存会非常频繁,cpu 会受到内存带宽的限制。

我会寻找一种避免使用这么多线程的设计,以避免分配大量堆栈和上下文切换开销。您不能同时处理 10,000 个线程。当前的硬件通常具有少于 100 个内核。

我会为每个硬件线程创建一个队列并以循环方式发送消息。如果处理时间变化很大,则存在一些线程在获得更多工作之前完成处理其队列的危险,而其他线程则永远无法完成分配的工作。这可以通过使用 JSR-166 ForkJoin 框架中实现的工作窃取来避免。

由于通信是从发布者到订阅者的一种方式,因此 Message 不需要任何特殊设计,假设订阅者在发布后不会更改消息。

编辑:读取 cmets,如果您有 10,000 个符号,则创建少数通用订阅者线程(每个核心一个订阅者线程),它们异步接收来自发布者的消息(例如,通过他们的消息队列)。订阅者从队列中拉出消息,从消息中检索符号,并在消息处理程序的 Map 中查找它,检索处理程序,并调用处理程序以同步处理消息。完成后,它会重复,从队列中获取下一条消息。如果必须按顺序处理相同符号的消息(这就是我猜您想要 10,000 个队列的原因。),您需要将符号映射到订阅者。例如。如果有 10 个订阅者,那么符号 0-999 到订阅者 0,1000-1999 到订阅者 1 等等。更精细的方案是根据符号的频率分布映射符号,以便每个订阅者获得大致相同的负载。例如,如果 10% 的流量是符号 0,那么订阅者 0 将只处理该符号,而其他符号将分配给其他订阅者。

【讨论】:

  • 有没有办法编写我的程序,从概念上讲它是 10,000 个独立的消费者,每个消费者都在自己的队列上工作?但是作为处理几个队列的几个线程运行?
  • @Mr.Burgundy 当然,有很多方法。例如作为一种简单的方法,您可以将消费者逻辑封装在一个类中(与消费者线程无关),将其中的 10k 个放在一个列表中,让 一个 消费者线程查找正确的一个并调用该逻辑消息的特定消费者。
  • @nos - 这将是单线程的。当有多个内核可用时,这将限制可达到的性能。
  • 如前所述,这是一种简单的方法。如果您在双核上,并且您的生产者与消费者做的工作一样多,这就是您所需要的。下一个稍微不那么简单的方法是生成 N 个使用者线程。或者保留一个消费者线程,并将工作分派给合适的配置ExecutorService,或者输给消费者线程并将工作直接分派给执行器
【解决方案2】:

你可以使用这个(感谢 Which ThreadPool in Java should I use?):

class Main {
    ExecutorService threadPool = Executors.newFixedThreadPool(
                                     Runtime.availableProcessors()*2);

    public static void main(String[] args){
        Set<Consumer> consumers = getConsumers(threadPool);
        for(Consumer consumer : consumers){
            threadPool.execute(consumer);
        }
    }
}

class Consumer {
    private final ExecutorService tp;
    private final MessageQueue messageQueue;
    Consumer(ExecutorService tp,MessageQueue queue){
        this.tp = tp;
        this.messageQueue = queue;
    }
    @Override
    public void run(){
              Message msg = messageQueue.poll();

              if (msg != null) {
                  try{
                       ... // do something with the message
                  finally{
                       this.tp.execute(this);
                  }
              }
           }
    }
}    

这样,您可以轻松安排好日程。

【讨论】:

    【解决方案3】:

    首先,除非您提供完整的设计文档,或者您自己尝试不同的方法,否则没有唯一的正确答案。

    我假设您的处理不会是计算密集型的,否则您不会考虑同时处理 10000 个队列。一种可能的解决方案是通过每个 CPU 拥有一两个线程来最小化上下文切换。除非您的系统将严格实时处理数据,否则可能会给每个队列带来更大的延迟,但总体吞吐量会更高。

    例如 - 让您的生产者线程在其自己的 CPU 上运行,并将批量消息发送到消费者线程。然后每个消费者线程将消息分发到它的 N 个私有队列,执行处理步骤,接收新的数据批次等等。同样,取决于您的延迟容限,因此处理步骤可能意味着处理所有队列,固定数量的队列,除非达到时间阈值,否则可以处理尽可能多的队列。能够轻松判断哪个队列属于哪个消费者线程(例如,如果队列按顺序编号:int consumerThreadNum = queueNum & 0x03)将是有益的,因为每次在哈希表中查找它们可能会很慢。

    为了最大程度地减少内存抖动,始终创建/销毁队列可能不是一个好主意,因此您可能希望为每个线程预先分配一个(最大队列数/核心数)队列对象。当队列完成而不是被销毁时,它可以被清除和重用。您不希望 gc 太频繁且太长时间妨碍您。

    另一个未知数是您的生产者是为每个队列生成完整的数据集,还是会分块发送数据,直到收到 KILL 命令。如果您的生产者发送完整的数据集,您可以完全取消队列概念,并在数据到达消费者线程时对其进行处理。

    【讨论】:

      【解决方案4】:

      拥有一个与硬件和操作系统容量相关的消费者线程池。这些消费者线程可以轮询您的消息队列。

      我要么让消息知道如何处理自己,要么在初始化时向消费者线程类注册处理器。

      【讨论】:

        【解决方案5】:

        由于没有关于处理符号的约束的更多细节,很难给出非常具体的建议。

        你应该看看这篇slashdot文章:

        http://developers.slashdot.org/story/10/07/27/1925209/Java-IO-Faster-Than-NIO

        它有很多关于多线程、单选和线程池参数的讨论和实际测量数据。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2019-05-24
          • 2011-12-16
          • 2017-02-24
          • 2015-04-05
          • 1970-01-01
          • 1970-01-01
          • 2012-04-17
          • 2011-03-12
          相关资源
          最近更新 更多