【发布时间】:2017-10-26 06:51:26
【问题描述】:
我想将传入消息写入消息队列,并让消息由单个专用线程立即使用 - 非常类似于 Spring Integration listen on queue without poller
我试过了:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.queue(10_000))
.bridge(spec -> spec.poller(Pollers.fixedDelay(0).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
但这会导致任务调度程序线程调度轮询操作,然后将阻塞直到有消息可用。这不是我对“专用线程”的想法,如果任务调度程序线程也被用于写入队列,那么在我的应用程序中会导致死锁,然后另一边就没有消费者线程了。
接下来我尝试的是:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.queue(10_000))
.bridge(spec -> spec.poller(Pollers.fixedDelay(0).taskExecutor(Executors.newSingleThreadExecutor()).receiveTimeout(Long.MAX_VALUE)))
.fixedSubscriberChannel()
.route(inboundRouter())
.get()
但这导致应用程序产生大量计划任务,因为fixedDelay(0)。
我遇到的下一个选项是:
IntegrationFlows
.from("inbound")
.channel(MessageChannels.executor(Executors.newSingleThreadExecutor()))
.route(inboundRouter())
.get()
这似乎按预期工作;我有一个处理所有消息的专用线程。但是,我不再拥有可以监控其统计信息的消息队列(通过 JMX)。
那么,有什么方法可以实现我的目标,以及如何实现?
【问题讨论】:
标签: java spring multithreading spring-integration