【问题标题】:Spring Aggregation Group春天聚合集团
【发布时间】:2016-12-05 20:34:34
【问题描述】:

我确实创建了如下聚合服务

@EnableBinding(Processor.class)
class Configuration {

@Autowired
Processor processor;


@ServiceActivator(inputChannel = Processor.INPUT)
@Bean
public MessageHandler aggregator() {

    AggregatingMessageHandler aggregatingMessageHandler =
            new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                    new SimpleMessageStore(10));

    //AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
    //aggregatorFactoryBean.setMessageStore();
    aggregatingMessageHandler.setOutputChannel(processor.output());
    //aggregatorFactoryBean.setDiscardChannel(processor.output());
    aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
    aggregatingMessageHandler.setSendTimeout(1000L);
    aggregatingMessageHandler.setCorrelationStrategy(new  ExpressionEvaluatingCorrelationStrategy("requestType"));
    aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
    aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
    aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
    aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
    return aggregatingMessageHandler;
    }
}

现在我想在创建新组后立即释放该组,所以我一次只有一个组。

更具体地说,我确实收到了两种类型的请求 'PUT' 和 'DEL' 。我想按照上述规则继续聚合,但是一旦我收到一个请求类型而不是我正在聚合的类型,我想释放当前组并开始聚合新类型。

我想这样做的原因是因为这些请求被发送到不支持同时具有 PUT 和 DEL 请求的另一方,并且我不能延迟任何 DEL 请求,因为 PUT 和 DEL 之间的顺序很重要.

我知道我需要创建一个自定义发布 Pojo,但我可以检查当前组吗?

举例

如果我收到如下 6 条消息

PUT PUT PUT DEL DEL PUT

它们应该汇总如下

3PUT

2 删除

1 个放置

【问题讨论】:

  • 抱歉,您的问题不清楚。将CorrelationStrategy 用作FOO 文字,您始终会得到同一个组,并且所有消息仅聚合到该组。而已。完全不清楚围绕“一旦创建新组就释放组”应该采取什么行动。
  • @ArtemBilan 很抱歉这个错误,我确实更新了这个问题,希望现在很清楚

标签: spring-integration spring-cloud-stream


【解决方案1】:

好的。感谢您分享更多信息。

是的,您自定义的ReleaseStrategy 可以检查该消息类型并返回true 以引导组完成功能。

只要你只有静态correlationKey,那么店里就只有一组。当您的消息进入ReleaseStrategy 时,仅检查当前组的完成信号不会有太多的魔力。由于商店中没有任何其他组,因此不需要任何复杂的发布逻辑。

您应该添加expireGroupsUponCompletion = true 以让该组在完成后被删除,并且下一条消息将为相同的correlationKey 形成一个新组。

更新

感谢您提供更多信息!

所以,是的,您原来的 PoC 很好。即使是静态的correlationKey 也可以,因为您只是要批量收集传入的消息。

您的自定义ReleaseStrategy 应分析MessageGroup 以查找具有不同key 的消息并在这种情况下返回true

自定义MessageGroupProcessor 应该从输出List 中过滤一条具有不同键的消息,并将该消息发送回聚合器,以便为其key 的序列形成一个新组。

【讨论】:

  • 好的,所以你建议保持correlationKey静态,然后检查方法,当它改变时我会释放吗?
  • 不,correlationKey 是基于拆分器的。请参阅它的文档和聚合器。目前尚不清楚您的上下文中的方法是什么。我们还需要知道您希望如何发布以及发布哪些内容
  • 我有两种类型的消息 'PUT' 和 'DEL' ,我可以将此字段用作相关键或保持静态。我想做的是在收到不同类型的消息后立即释放组,我用示例更新了问题
  • 请在我的回答中找到更新。
  • 我不知道应该如何实现 MessageGroupProcessor 。最后一条消息应作为新组中的第一条消息发送回聚合器
【解决方案2】:

我最终实现了以下ReleaseStrategy,因为我发现它比删除消息并再次排队更简单。

class MessageCountAndOnlyOneGroupReleaseStrategy implements org.springframework.integration.aggregator.ReleaseStrategy {

    private final int threshold;

    private final MessageGroupProcessor messageGroupProcessor;


    public MessageCountAndOnlyOneGroupReleaseStrategy(int threshold,MessageGroupProcessor messageGroupProcessor) {
        super();
        this.threshold = threshold;
        this.messageGroupProcessor = messageGroupProcessor;
    }

    private MessageGroup currentGroup;

    @Override
    public boolean canRelease(MessageGroup group) {
        if(currentGroup == null)
            currentGroup = group;

        if(!group.getGroupId().equals(currentGroup.getGroupId())) {
            messageGroupProcessor.processMessageGroup(currentGroup);
            currentGroup = group;
            return false;
        }


        return group.size() >= this.threshold;
    }

}

请注意,我确实使用new HeaderAttributeCorrelationStrategy("request_type") 而不是只使用FOO 代替CollorationStrategy

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-04-01
    • 2020-03-01
    • 2011-09-23
    • 1970-01-01
    • 2015-11-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多