【问题标题】:Spring Integration JMS Consumers not consuming all messagesSpring Integration JMS Consumers 不消费所有消息
【发布时间】:2020-04-30 23:30:07
【问题描述】:

设置

我有一个名为 Dispatcher 的 Spring Boot 应用程序。它在 1 台机器上运行并具有嵌入式 ActiveMQ 代理:

  @Bean
  public BrokerService broker(ActiveMQProperties properties) throws Exception {
    BrokerService broker = new BrokerService();
    broker.setPersistent(false);
    broker.addConnector(properties.getBrokerUrl());
    return broker;
  }

将任务写入 JMS 队列:

  @Bean
  public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
      .from(taskQueue())
      .bridge(Bridges.blockingPoller(outboundTaskScheduler()))
      .transform(outboundTransformer)
      .handle(Jms.outboundAdapter(connectionFactory)
        .extractPayload(false)
        .destination(JmsQueueNames.STANDARD_TASKS))
      .get();
  }

  @Bean
  public QueueChannel standardTaskQueue() {
    return MessageChannels.priority()
      .comparator(TASK_PRIO_COMPARATOR)
      .get();
  }

  // 2 more queues with different names but same config

Worker 应用程序在 10 台机器上运行,每台机器有 20 个内核,配置如下:

  @Bean
  public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
    int maxWorkers = 20;
    return IntegrationFlows
      .from(Jms.channel(connectionFactory)
        .sessionTransacted(true)
        .concurrentConsumers(maxWorkers)
        .taskExecutor(
          Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
        )
        .destination(JmsQueueNames.STANDARD_TASKS))
      .channel(ChannelNames.TASKS_INBOUND)
      .get();
  }

  // 2 more inbound queues with different names but same config

这对第二个队列重复,加上 1 个特殊情况。所以总共有 401 个消费者

观察

使用JConsole,可以看到ActiveMQ队列中有任务:

[TODO 插入截图]

正如预期的那样,在任何 Worker 机器上,都有 20 个消费者线程:

[TODO 插入截图]

但即使不是全部,大多数也是空闲,即使队列中仍有消息。使用我们的监控工具,我看到在任何给定时间大约有 50 到 400 个任务正在处理,而预期是恒定的 400。

我还观察到 Spring 为每个使用者创建 AbstractPollingMessageListenerContainer,这似乎导致每个应用程序每秒每个队列打开 1 个 JMS 连接(每秒 33 个连接)。

调查

所以我发现I do not receive messages in my second consumer 暗示prefetch 是罪魁祸首。这听起来很合理,所以我为每个工人配置了tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1。然而,在任何时候都只处理了大约 25 个任务,这对我来说完全没有意义。

问题

我似乎不明白发生了什么,由于我没有时间进行调查,我希望有人能指出我正确的方向。哪些因素可能是原因?消费者/连接的数量?预取?还有什么?

【问题讨论】:

    标签: java spring spring-integration activemq spring-jms


    【解决方案1】:

    原来实际上是由预取策略引起的。在我的情况下,正确的配置是使用tcp://dispatcher:61616?jms.prefetchPolicy.all=0

    在我之前的(失败的)测试中,我使用了jms.prefetchPolicy.queuePrefetch=1,但事后我不确定我是否在正确的地方配置了它。

    【讨论】:

      猜你喜欢
      • 2023-03-29
      • 2016-02-10
      • 1970-01-01
      • 2020-03-18
      • 2018-04-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-07-08
      相关资源
      最近更新 更多