【问题标题】:How to group/batch events with aggregator in spring integration如何在春季集成中使用聚合器对事件进行分组/批处理
【发布时间】:2018-06-15 08:40:55
【问题描述】:

我有一个频道,其有效负载是不同的 POJO,它们实现了一个名为 Event 的接口。

public interface Event {
    String getEventType();
}

使用网关将许多事件类型的事件一一添加到通道中。我想根据事件类型对事件进行分组并调用服务激活器。该服务具有以下签名。

void processEventsInBatch(String eventType, List<Event> events);

重要的是获取列表中属于同一事件类型的多个事件以批量处理它们并减少对外部服务的多次调用。

如何通过 spring 集成实现这一点?

【问题讨论】:

  • 发布这个问题和我想出的答案,因为我在这个问题上有点挣扎。我相信会有更好的方法来解决这个问题;我期待作为替代答案。

标签: java spring spring-integration


【解决方案1】:

spring-integration 中的Aggregator 使用correlation-id 标头(默认)来识别同一组中的不同消息。所以第一步是获取 eventType 作为相关 ID 标头。稍后我们可以在服务激活器中将此标头作为 eventType 参数获取,因为相关 ID 标头将出现在聚合器创建的组中。这可以通过以下xml配置来完成

<int:header-enricher>
    <int:correlation-id expression="payload.getEventType()"/>
<int:header-enricher>

现在可以使用聚合器,如下所示。

<int:aggregator release-strategy-expression="size() >= 25"
                group-timeout="5000"
                expire-group-upon-completion="true"
                send-partial-results-on-expiry="true" />

当一个组中至少有 25 个事件或等待 5 秒时,上述聚合器将发送一个组。我们可以调整前两个参数来控制我们希望列表有多大以及我们想要引入多少延迟。 expire-group-upon-completion 属性是必需的,以确保聚合器继续创建具有相同相关 ID 的新组。并且send-partial-results-on-expiry 需要确保如果我们在 5 秒内收到少于 25 个事件,那么聚合器将发送一个包含它所拥有的组。

【讨论】:

  • 您对问题的理解和回答非常出色。不知道您有什么疑问...只要接受您自己的答案并继续开发!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-02-02
  • 1970-01-01
  • 2014-12-15
  • 2018-05-27
  • 2015-05-13
  • 2017-06-25
相关资源
最近更新 更多