【问题标题】:Spring Integration aws Kinesis , message aggregator, Release StrategySpring Integration aws Kinesis,消息聚合器,发布策略
【发布时间】:2021-10-12 03:00:07
【问题描述】:

这是Spring Integration AWS RabbitMQ Kinesis的后续问题

我有以下配置。我注意到,当我第一次向名为 kinesisSendChannel 的输入通道发送消息时,将调用聚合器和发布策略并将消息发送到 Kinesis Streams。我将调试断点放在不同的地方,并且可以验证这种行为。但是,当我再次将消息发布到同一输入通道时,不会调用发布策略和出站处理器,并且不会将消息发送到 Kinesis。我不确定为什么只在第一次而不是为后续消息调用聚合器流。出于测试目的,TimeoutCountSequenceSizeReleaseStrategy 设置为计数为 1,时间为 60 秒。没有使用特定的 MessageStore。您能帮忙找出问题吗?

@Bean(name = "kinesisSendChannel")
public MessageChannel kinesisSendChannel() {
    return MessageChannels.direct().get();
}

@Bean(name = "resultChannel")
public MessageChannel resultChannel() {
    return MessageChannels.direct().get();
}

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler aggregator(TestMessageProcessor messageProcessor,
        MessageChannel resultChannel,
        TimeoutCountSequenceSizeReleaseStrategy timeoutCountSequenceSizeReleaseStrategy) {
    AggregatingMessageHandler handler = new AggregatingMessageHandler(messageProcessor);
    handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
    handler.setReleaseStrategy(timeoutCountSequenceSizeReleaseStrategy);
    handler.setOutputProcessor(messageProcessor);
    handler.setOutputChannel(resultChannel);
    return handler;

}

@Bean
@ServiceActivator(inputChannel = "resultChannel")
public MessageHandler kinesisMessageHandler1(@Qualifier("successChannel") MessageChannel successChannel,
        @Qualifier("errorChannel") MessageChannel errorChannel, final AmazonKinesisAsync amazonKinesis) {
    KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
    kinesisMessageHandler.setSync(true);
    kinesisMessageHandler.setOutputChannel(successChannel);
    kinesisMessageHandler.setFailureChannel(errorChannel);

    return kinesisMessageHandler;
}



public class TestMessageProcessor extends AbstractAggregatingMessageGroupProcessor {

    @Override
    protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
        final PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("test-stream");

        final List<PutRecordsRequestEntry> putRecordsRequestEntry = group.getMessages().stream()
                .map(message -> (PutRecordsRequestEntry) message.getPayload()).collect(Collectors.toList());

        putRecordsRequest.withRecords(putRecordsRequestEntry);
        return putRecordsRequestEntry;

    }

}

【问题讨论】:

    标签: spring-integration spring-integration-aws


    【解决方案1】:

    我相信问题出在handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));。您的所有消息都带有相同的 foo 标头。因此,它们都形成了同一个消息组。只要你释放组并且不删除它,所有新消息都会被丢弃。

    请修改聚合器文档以熟悉所有可能的行为:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator

    【讨论】:

    • 我的要求是只要输入消息的数量达到可配置的大小或时间限制,就将其推送到 kinesis 流。我将静态消息头设置为 foo 的唯一原因是在发布到 kinesis 之前在聚合器中实现此分组。我所有的输入消息都是独立的,不相关的。有没有更好的方法来实现这一目标?我只是想避免为每条输入消息调用 kinesis。
    • expireGroupsUponCompletion=true,另见groupTimeout。在消息到达聚合器之前不会查询TimeoutCountSequenceSizeReleaseStrategygroupTimeout 可能会释放该组。所有这些信息都在文档中。
    • 谢谢你..这工作..
    猜你喜欢
    • 2012-12-19
    • 2014-03-24
    • 2014-02-17
    • 1970-01-01
    • 2018-02-27
    • 2011-10-21
    • 1970-01-01
    • 1970-01-01
    • 2018-05-30
    相关资源
    最近更新 更多