【问题标题】:Spring integration MessageQueue without pollingSpring 集成 MessageQueue 无需轮询
【发布时间】:2017-10-26 06:51:26
【问题描述】:

我想将传入消息写入消息队列,并让消息由单个专用线程立即使用 - 非常类似于 Spring Integration listen on queue without poller

我试过了:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.queue(10_000))
  .bridge(spec -> spec.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

但这会导致任务调度程序线程调度轮询操作,然后将阻塞直到有消息可用。这不是我对“专用线程”的想法,如果任务调度程序线程也被用于写入队列,那么在我的应用程序中会导致死锁,然后另一边就没有消费者线程了。

接下来我尝试的是:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.queue(10_000))
  .bridge(spec -> spec.poller(Pollers.fixedDelay(0).taskExecutor(Executors.newSingleThreadExecutor()).receiveTimeout(Long.MAX_VALUE)))
  .fixedSubscriberChannel()
  .route(inboundRouter())
  .get()

但这导致应用程序产生大量计划任务,因为fixedDelay(0)

我遇到的下一个选项是:

IntegrationFlows
  .from("inbound")
  .channel(MessageChannels.executor(Executors.newSingleThreadExecutor()))
  .route(inboundRouter())
  .get()

这似乎按预期工作;我有一个处理所有消息的专用线程。但是,我不再拥有可以监控其统计信息的消息队列(通过 JMX)。

那么,有什么方法可以实现我的目标,以及如何实现?

【问题讨论】:

    标签: java spring multithreading spring-integration


    【解决方案1】:

    那么,您必须真正为该线程单独指定TaskScheduler

    不幸的是,框架没有(还)提供清晰的 API 用于将其注入端点,但无论如何这是可能的:

        @Bean
        public IntegrationFlow dedicatedPollingThreadFlow() {
            return IntegrationFlows.from(MessageChannels.queue("myQueueChannel"))
                    .bridge(e -> e
                            .poller(Pollers.fixedDelay(0).receiveTimeout(-1))
                            .id("dedicatedPollingConsumer"))
                    .channel(c -> c.queue("results"))
                    .get();
        }
    
        @Bean
        public TaskScheduler dedicatedTaskScheduler() {
            return new ThreadPoolTaskScheduler();
        }
    
        @Bean
        @DependsOn("dedicatedPollingThreadFlow")
        public String dedicatedPollingConsumerConfigurer(
                @Qualifier("dedicatedPollingConsumer") PollingConsumer dedicatedPollingConsumer) {
            dedicatedPollingConsumer.setTaskScheduler(dedicatedTaskScheduler());
            return "";
        }
    

    也要注意.receiveTimeout(-1)。这样它会执行常规的BlockingQueue.take(),永远阻塞你的专用线程。

    从框架的角度来看,将TaskScheduler 与现有的.poller() 一起注入GenericEndpointSpec

    同时JIRA ticket

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-12-06
      • 2020-10-28
      • 1970-01-01
      • 2014-02-16
      • 1970-01-01
      • 2017-02-16
      相关资源
      最近更新 更多