【发布时间】:2020-04-30 23:30:07
【问题描述】:
设置
我有一个名为 Dispatcher 的 Spring Boot 应用程序。它在 1 台机器上运行并具有嵌入式 ActiveMQ 代理:
@Bean
public BrokerService broker(ActiveMQProperties properties) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector(properties.getBrokerUrl());
return broker;
}
将任务写入 JMS 队列:
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(taskQueue())
.bridge(Bridges.blockingPoller(outboundTaskScheduler()))
.transform(outboundTransformer)
.handle(Jms.outboundAdapter(connectionFactory)
.extractPayload(false)
.destination(JmsQueueNames.STANDARD_TASKS))
.get();
}
@Bean
public QueueChannel standardTaskQueue() {
return MessageChannels.priority()
.comparator(TASK_PRIO_COMPARATOR)
.get();
}
// 2 more queues with different names but same config
Worker 应用程序在 10 台机器上运行,每台机器有 20 个内核,配置如下:
@Bean
public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
int maxWorkers = 20;
return IntegrationFlows
.from(Jms.channel(connectionFactory)
.sessionTransacted(true)
.concurrentConsumers(maxWorkers)
.taskExecutor(
Executors.newFixedThreadPool(maxWorkers, new CustomizableThreadFactory("standard-"))
)
.destination(JmsQueueNames.STANDARD_TASKS))
.channel(ChannelNames.TASKS_INBOUND)
.get();
}
// 2 more inbound queues with different names but same config
这对第二个队列重复,加上 1 个特殊情况。所以总共有 401 个消费者。
观察
使用JConsole,可以看到ActiveMQ队列中有任务:
[TODO 插入截图]
正如预期的那样,在任何 Worker 机器上,都有 20 个消费者线程:
[TODO 插入截图]
但即使不是全部,大多数也是空闲,即使队列中仍有消息。使用我们的监控工具,我看到在任何给定时间大约有 50 到 400 个任务正在处理,而预期是恒定的 400。
我还观察到 Spring 为每个使用者创建 AbstractPollingMessageListenerContainer,这似乎导致每个应用程序每秒每个队列打开 1 个 JMS 连接(每秒 33 个连接)。
调查
所以我发现I do not receive messages in my second consumer 暗示prefetch 是罪魁祸首。这听起来很合理,所以我为每个工人配置了tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1。然而,在任何时候都只处理了大约 25 个任务,这对我来说完全没有意义。
问题
我似乎不明白发生了什么,由于我没有时间进行调查,我希望有人能指出我正确的方向。哪些因素可能是原因?消费者/连接的数量?预取?还有什么?
【问题讨论】:
标签: java spring spring-integration activemq spring-jms