【问题标题】:Spring Intgergation aws - KinesisMessageHandler Direct ChannelSpring Intgergation aws - KinesisMessageHandler 直接通道
【发布时间】:2021-11-04 22:16:27
【问题描述】:

我的用于将消息发布到 kinesis 流的消息处理程序如下

public MessageHandler kinesisMessageHandler(final AmazonKinesisAsync amazonKinesis,
        @Qualifier("successChannel") MessageChannel successChannel,
        @Qualifier("errorChannel") MessageChannel errorChannel) {

    KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
    kinesisMessageHandler.setSync(false);
    kinesisMessageHandler.setOutputChannel(successChannel);
    kinesisMessageHandler.setFailureChannel(errorChannel);
    return kinesisMessageHandler;
}

@Bean(name = "errorChannel")
public MessageChannel errorChannel() {
      return MessageChannels.direct().get();
}

@Bean(name = "successChannel")
public MessageChannel successChannel() {
     return MessageChannels.direct().get();
}

setSync 标志设置为 false,以便异步处理消息。另外,我创建了单独的 IntegrationFlow 来接收和处理来自成功和错误通道的 Kinesis 响应。

public IntegrationFlow successMessageIntegrationFlow(MessageChannel successChannel,
                MessageChannel inboundKinesisMessageChannel,
                MessageReceiverServiceActivator kinesisMessageReceiverServiceActivator) {
            return IntegrationFlows.from(successChannel).channel(inboundKinesisMessageChannel)
                    .handle(kinesisMessageReceiverServiceActivator, "receiveMessage").get();
}

@Bean
public IntegrationFlow errorMessageIntegrationFlow(MessageChannel errorChannel,
                MessageChannel inboundKinesisErrorChannel,
                MessageReceiverServiceActivator kinesisErrorReceiverServiceActivator
               ) {
            return IntegrationFlows.from(errorChannel).channel(inboundKinesisErrorChannel)
                    .handle(kinesisErrorReceiverServiceActivator, "receiveMessage").get();
}

我想知道您在使用 Direct Channel 从 Kinesis 接收成功和错误响应并使用 IntegrationFlow 处理它时是否发现任何问题。据我所知,使用 Direct Channel,生产者在发送期间是一个阻塞者,直到消费者完成其工作并将管理返回给生产者调用者。假设生产者在这里由 AmazonKinesisAsyncClient 在一组不同的线程池中执行并且生产者不会等待 IntegrationFlow 处理消息,这是一个正确的假设吗?如果我需要以不同的方式实现它,请告诉我

【问题讨论】:

    标签: spring-integration spring-integration-dsl spring-integration-aws


    【解决方案1】:

    您关于阻塞的假设是正确的:控制不会返回到生产线程。因此,如果该 Kinesis 客户端中的线程数量有限,您需要确保尽快释放它们。您可能会考虑将这些回调放在队列通道中。无论如何,它们都是异步的,但如果那样,它们就不会持有 Kinesis 客户端。

    您的流程中仍然存在缺陷:.channel(inboundKinesisMessageChannel)。这意味着如果两个不同的流在中间是相同的通道。如果它是直接的,那么你最终会得到循环分配。我会完全删除它。

    【讨论】:

    • 感谢 cmets 和反馈。我将直接频道更改为队列频道。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-03-22
    • 1970-01-01
    • 2016-06-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多