【问题标题】:How to stop the polling of @InboundChannelAdapter when kafka went down to prevent data loss?kafka宕机时如何停止@InboundChannelAdapter的轮询,防止数据丢失?
【发布时间】:2021-03-31 04:12:35
【问题描述】:

我正在使用 Spring Cloud 数据流。

@Bean
@InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2"))
public Supplier<Product> getProductSource(ProductBuilder dataAccess) {

    return ()->dataAccess.getNext();
        
    };

如果 kafka 突然宕机,我们如何停止这种轮询行为以防止数据丢失?

当我在测试时,即使 kafka 发生故障,数据也会不断从数据库中读取,并不断尝试将记录发送到 kafka?

预期性能是一旦 kafka 宕机就停止数据轮询..

有什么方法可以实现吗?

【问题讨论】:

    标签: spring-integration spring-cloud spring-kafka spring-cloud-stream spring-cloud-dataflow


    【解决方案1】:

    @InboundChannelAdapter@Poller 可以配置为errorChannel

    /**
     * @return The the bean name of default error channel
     * for the underlying {@code MessagePublishingErrorHandler}.
     * @since 4.3.3
     */
    String errorChannel() default "";
    

    因此,每当该TbeSource.PR1 通道上的流下游发生异常时,它将被传递到提供的错误通道以处理其上的某些错误流。

    在那里,您可以按照逻辑停止为 @InboundChannelAdapterSupplier 组合创建的 SourcePollingChannelAdapter。在这种情况下,bean id 是这样的:[CONFIGURATION_CLASS_BEAN_NAME.getProductSource.inboundChannelAdapter]。请参阅此处了解更多信息:https://docs.spring.io/spring-integration/reference/html/configuration.html#annotations_on_beans。正如它所说,您也可以只使用@EndpointId 来通过依赖注入例程简化您的生活。

    确保重新抛出异常,让数据库事务回滚以避免数据丢失!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-05-14
      • 2019-07-12
      • 1970-01-01
      相关资源
      最近更新 更多