【发布时间】:2016-11-22 14:27:22
【问题描述】:
我一直在使用 Spring boot 并删除了我项目中的所有 XML 文件。
不幸的是,它还使用 Spring 集成,根据我的经验,它非常基于 XML。
我有一个场景需要我有一个聚合器,并且每隔 x 秒数轮询该聚合器。
这可以像这样使用 XML 来完成(示例取自之前的 SO 问题):
<!--
the poller will process 100 messages every minute
if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
send-partial-result-on-expiry="true"
group-timeout="60000"
correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 100">
<int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>
我已经设法找到了一个可以解决问题的类,它的 bean 定义是:
@Bean(name = "aggregatingMessageHandler")
public AggregatingMessageHandler aggregatingMessageHandler() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(messageGroupProcessorBean(),
new SimpleMessageStore(10));
aggregatingMessageHandler.setCorrelationStrategy(customCorrelationStrategyBean());
aggregatingMessageHandler.setReleaseStrategy(
new TimeoutCountSequenceSizeReleaseStrategy(3,
TimeoutCountSequenceSizeReleaseStrategy.DEFAULT_TIMEOUT));
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setOutputChannel(outputAggregatedChannelBean());
return aggregatingMessageHandler;
}
但是,这仅在与此处理程序关联的inboundChannel 中接收到新消息时才会触发ReleaseStrategy 的canRelease() 方法,而不是在不是所需结果的固定时间间隔内。
我希望将所有超过一分钟的组重定向到输出通道。
我的问题是 - 有没有办法以编程方式附加轮询器,例如 XML 定义中的轮询器?
【问题讨论】:
标签: spring spring-boot spring-integration