【发布时间】:2021-10-12 03:00:07
【问题描述】:
这是Spring Integration AWS RabbitMQ Kinesis的后续问题
我有以下配置。我注意到,当我第一次向名为 kinesisSendChannel 的输入通道发送消息时,将调用聚合器和发布策略并将消息发送到 Kinesis Streams。我将调试断点放在不同的地方,并且可以验证这种行为。但是,当我再次将消息发布到同一输入通道时,不会调用发布策略和出站处理器,并且不会将消息发送到 Kinesis。我不确定为什么只在第一次而不是为后续消息调用聚合器流。出于测试目的,TimeoutCountSequenceSizeReleaseStrategy 设置为计数为 1,时间为 60 秒。没有使用特定的 MessageStore。您能帮忙找出问题吗?
@Bean(name = "kinesisSendChannel")
public MessageChannel kinesisSendChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "resultChannel")
public MessageChannel resultChannel() {
return MessageChannels.direct().get();
}
@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler aggregator(TestMessageProcessor messageProcessor,
MessageChannel resultChannel,
TimeoutCountSequenceSizeReleaseStrategy timeoutCountSequenceSizeReleaseStrategy) {
AggregatingMessageHandler handler = new AggregatingMessageHandler(messageProcessor);
handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
handler.setReleaseStrategy(timeoutCountSequenceSizeReleaseStrategy);
handler.setOutputProcessor(messageProcessor);
handler.setOutputChannel(resultChannel);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "resultChannel")
public MessageHandler kinesisMessageHandler1(@Qualifier("successChannel") MessageChannel successChannel,
@Qualifier("errorChannel") MessageChannel errorChannel, final AmazonKinesisAsync amazonKinesis) {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
kinesisMessageHandler.setSync(true);
kinesisMessageHandler.setOutputChannel(successChannel);
kinesisMessageHandler.setFailureChannel(errorChannel);
return kinesisMessageHandler;
}
public class TestMessageProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
final PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("test-stream");
final List<PutRecordsRequestEntry> putRecordsRequestEntry = group.getMessages().stream()
.map(message -> (PutRecordsRequestEntry) message.getPayload()).collect(Collectors.toList());
putRecordsRequest.withRecords(putRecordsRequestEntry);
return putRecordsRequestEntry;
}
}
【问题讨论】:
标签: spring-integration spring-integration-aws