【问题标题】:how to handle deserialized exceptions in case of batch mode在批处理模式下如何处理反序列化异常
【发布时间】:2021-04-08 21:13:36
【问题描述】:

在批处理模式下如何处理反序列化的异常?
我正在使用带有 spring boot -2.3.8 版本的 spring kafka。

试过这个选项:

@Bean
  public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
      ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
      ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory); 
    factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
    }

但它会抛出异常: 由:java.lang.IllegalStateException:错误处理程序必须是ErrorHandler,而不是org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler

这是我的应用程序属性:

spring.kafka.consumer.bootstrap-servers = localhost:9093
spring.kafka.consumer.enable-auto-commit = false
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.max-poll-records = 10

spring.kafka.consumer.key-deserializer= org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer= org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.listener.type =  batch 
spring.kafka.ack.discard =  true
spring.kafka.listener.ack-mode = MANUAL
spring.kafka.listener.concurrency = 1
idle-between-polls  = 120000

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    这意味着没有正确设置监听器类型。

    我刚刚将您的代码和配置复制到一个新应用程序中,它按预期工作。

    【讨论】:

    • 设置这个 spring.kafka.listener.type = batch 应该可以。但是当我在容器级别设置它时,比如 factory.setBatchListener(true); factory.setAckDiscarded(isAckDiscarded);然后工作。有什么原因吗??
    • 我无法解释;不;正如我所说,我只是按原样复制了您的代码/配置,它对我有用。我使用的是引导 2.4.4。但我只是用 2.3.8(和 2.3.9)尝试过,它仍然有效。
    • 感谢@Gary 及时提供帮助。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-08-15
    • 2012-10-05
    • 1970-01-01
    • 2020-09-18
    • 2018-12-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多