【问题标题】:Spring Integration poll aggregator programmatically以编程方式 Spring Integration 投票聚合器
【发布时间】:2016-11-22 14:27:22
【问题描述】:

我一直在使用 Spring boot 并删除了我项目中的所有 XML 文件。 不幸的是,它还使用 Spring 集成,根据我的经验,它非常基于 XML

我有一个场景需要我有一个聚合器,并且每隔 x 秒数轮询该聚合器。

这可以像这样使用 XML 来完成(示例取自之前的 SO 问题):

<!-- 
    the poller will process 100 messages every minute 
    if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
    send-partial-result-on-expiry="true"
    group-timeout="60000"
    correlation-strategy-expression="T(Thread).currentThread().id"
    release-strategy-expression="size() == 100">
    <int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>

我已经设法找到了一个可以解决问题的类,它的 bean 定义是:

@Bean(name = "aggregatingMessageHandler")
public AggregatingMessageHandler aggregatingMessageHandler() {

    AggregatingMessageHandler aggregatingMessageHandler =
            new AggregatingMessageHandler(messageGroupProcessorBean(),
                    new SimpleMessageStore(10));

 aggregatingMessageHandler.setCorrelationStrategy(customCorrelationStrategyBean());

    aggregatingMessageHandler.setReleaseStrategy(
            new TimeoutCountSequenceSizeReleaseStrategy(3,
                    TimeoutCountSequenceSizeReleaseStrategy.DEFAULT_TIMEOUT));

    aggregatingMessageHandler.setExpireGroupsUponCompletion(true);

    aggregatingMessageHandler.setOutputChannel(outputAggregatedChannelBean());

    return aggregatingMessageHandler;
}

但是,这仅在与此处理程序关联的inboundChannel 中接收到新消息时才会触发ReleaseStrategycanRelease() 方法,而不是在不是所需结果的固定时间间隔内。 我希望将所有超过一分钟的组重定向到输出通道。 我的问题是 - 有没有办法以编程方式附加轮询器,例如 XML 定义中的轮询器?

【问题讨论】:

    标签: spring spring-boot spring-integration


    【解决方案1】:

    对于 Java 和注释配置,请查看 herehere

    Aggregator 组件具有AggregatorFactoryBean 以便于 Java 配置。

    无论如何,您必须注意在该处理程序定义上有一个@ServiceActivator 注释和一个@Bean。并且恰好@ServiceActivator 具有poller 属性。

    还要注意有一个 Java DSL 用于 Spring 集成。

    您问题的另一部分有点混乱。 poller 完全与发布策略无关。在这种情况下,它负责接收来自PollableChannel 的消息,即logEntryChannel。并且只有在那之后,已经轮询的消息才会被放置到聚合器中,以便进行关联和发布逻辑。

    该示例中所做的事情完全不同,我们可以在单独的 SO 线程中讨论它。

    【讨论】:

    • 正如@artem 所说,轮询器与完成无关(除了它从频道获得消息并且组可以完成之外)。这是由group-timeout 完成的。您可以设置groupTimeoutExpression(SpEL 表达式)以使组异步超时。 new SpelExpressionParser().parseExpression("60000").
    • 谢谢,这正是我想要的。
    猜你喜欢
    • 1970-01-01
    • 2015-10-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-12-06
    • 1970-01-01
    • 1970-01-01
    • 2018-10-11
    相关资源
    最近更新 更多