【问题标题】:Spring concurrent processing multiple queues with single thread poolSpring用单线程池并发处理多个队列
【发布时间】:2017-06-15 09:17:15
【问题描述】:

我有一个 spring 应用程序应该处理和存储套接字传入数据,因为瓶颈问题应该使用多线程来完成。

传入的数据属于很多个实体,每个实体的任务都应该串行处理,但是我认为给每个实体分配一个线程并不是一个好的解决方案(几千个单线程来处理每个实体的队列)

那么如何定义一个公共ThreadPool 来使用票价算法处理所有实体的队列?

【问题讨论】:

    标签: java spring multithreading spring-integration


    【解决方案1】:

    您已经描述了一个使用消息驱动架构解决的完美问题。

    Spring Integration 是为您提供此功能的模块。

    您可以使用@ServiceActivator 构建您的任务服务和注释,并使用Channels 创建您的链。

    通道可以选择在不同的线程池上执行,并且可以通过通道上的队列设置来克服峰值负载造成的瓶颈。

    绝对值得一试查看 Spring Integration 的文档。

    【讨论】:

      【解决方案2】:

      您可以使用Project reactorRxJava 按组拆分传入消息流并串行处理每个组中的事件。

      使用 Project Reactor,您的代码可以如下所示:

          Scheduler groupScheduler = Schedulers.newParallel("groupByPool", 16);
          Flux.fromStream(incomingMessages()) // stream of new data from socket
                  .groupBy(Message::getEntityId) // split incoming messages by groups, which should be processed serially
                  .map(g -> g.publishOn(groupScheduler)) //create new publisher for groups of messages
                  .subscribe( //create consumer for main stream
                          stream ->
                                  stream.subscribe(this::processMessage) // create consumer for group stream and process messagaes
                  );
      

      【讨论】:

      • 在您的示例代码中,您正在为每个组创建新的并行线程池,我在我的问题中指出它是一个不好的解决方案。使用单个线程池的票价算法处理所有任务是我的目标
      • @Mojtabye 同意,抱歉错过了。更新答案
      猜你喜欢
      • 2012-12-06
      • 2012-01-23
      • 2016-04-18
      • 2017-05-18
      • 2012-11-12
      • 1970-01-01
      • 2012-03-31
      • 1970-01-01
      • 2013-06-09
      相关资源
      最近更新 更多