【发布时间】:2021-11-04 22:16:27
【问题描述】:
我的用于将消息发布到 kinesis 流的消息处理程序如下
public MessageHandler kinesisMessageHandler(final AmazonKinesisAsync amazonKinesis,
@Qualifier("successChannel") MessageChannel successChannel,
@Qualifier("errorChannel") MessageChannel errorChannel) {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
kinesisMessageHandler.setSync(false);
kinesisMessageHandler.setOutputChannel(successChannel);
kinesisMessageHandler.setFailureChannel(errorChannel);
return kinesisMessageHandler;
}
@Bean(name = "errorChannel")
public MessageChannel errorChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "successChannel")
public MessageChannel successChannel() {
return MessageChannels.direct().get();
}
setSync 标志设置为 false,以便异步处理消息。另外,我创建了单独的 IntegrationFlow 来接收和处理来自成功和错误通道的 Kinesis 响应。
public IntegrationFlow successMessageIntegrationFlow(MessageChannel successChannel,
MessageChannel inboundKinesisMessageChannel,
MessageReceiverServiceActivator kinesisMessageReceiverServiceActivator) {
return IntegrationFlows.from(successChannel).channel(inboundKinesisMessageChannel)
.handle(kinesisMessageReceiverServiceActivator, "receiveMessage").get();
}
@Bean
public IntegrationFlow errorMessageIntegrationFlow(MessageChannel errorChannel,
MessageChannel inboundKinesisErrorChannel,
MessageReceiverServiceActivator kinesisErrorReceiverServiceActivator
) {
return IntegrationFlows.from(errorChannel).channel(inboundKinesisErrorChannel)
.handle(kinesisErrorReceiverServiceActivator, "receiveMessage").get();
}
我想知道您在使用 Direct Channel 从 Kinesis 接收成功和错误响应并使用 IntegrationFlow 处理它时是否发现任何问题。据我所知,使用 Direct Channel,生产者在发送期间是一个阻塞者,直到消费者完成其工作并将管理返回给生产者调用者。假设生产者在这里由 AmazonKinesisAsyncClient 在一组不同的线程池中执行并且生产者不会等待 IntegrationFlow 处理消息,这是一个正确的假设吗?如果我需要以不同的方式实现它,请告诉我
【问题讨论】:
标签: spring-integration spring-integration-dsl spring-integration-aws