【问题标题】:Handling exceptions in Kafka streams处理 Kafka 流中的异常
【发布时间】:2018-07-12 07:21:26
【问题描述】:

浏览过多个帖子,但其中大多数都与处理错误消息有关,而不是处理它们时的异常处理。

我想知道如何处理流应用程序收到的消息并且在处理消息时出现异常?异常可能是由于多种原因,如网络故障、RuntimeException 等,

  • 有人能建议什么是正确的做法吗?我应该使用 setUncaughtExceptionHandler?还是有更好的方法?
  • 如何处理重试?

【问题讨论】:

    标签: java apache-kafka-streams spring-kafka


    【解决方案1】:

    这取决于你想对生产者端的异常做什么。 如果生产者抛出异常(例如由于网络故障或 kafka 代理已死亡),则流将默认死亡。并且使用 kafka-streams 1.1.0 版,您可以通过实现 ProductionExceptionHandler 来覆盖默认行为,如下所示:

    public class CustomProductionExceptionHandler implements ProductionExceptionHandler {
    
        @Override
        public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                                         final Exception exception) {
            log.error("Kafka message marked as processed although it failed. Message: [{}], destination topic: [{}]",  new String(record.value()), record.topic(), exception);
            return ProductionExceptionHandlerResponse.CONTINUE;
        }
    
        @Override
        public void configure(final Map<String, ?> configs) {
        }
    
    }
    

    如果您不希望流因异常而死亡,则可以从句柄方法返回CONTINUE,如果您希望流停止(默认为FAIL),则返回FAIL。 你需要在流配置中指定这个类:

    default.production.exception.handler=com.example.CustomProductionExceptionHandler
    

    还要注意ProductionExceptionHandler只处理producer上的异常,在处理消息的流式方法mapValues(..)filter(..)branch(..)等时不会处理异常,你需要用try包装这些方法逻辑/catch 块(将你所有的方法逻辑放入 try 块中,以保证你将处理所有异常情况):

    .filter((key, value) -> { try {..} catch (Exception e) {..} })
    

    据我所知,我们不需要显式处理消费者端的异常,因为 kafka 流稍后会自动重试消费(因为在消息被消费和处理之前,偏移量不会改变);例如如果在一段时间内无法访问 kafka 代理,您将从 kafka 流中得到异常,并且当中断时,kafka 流将消耗所有消息。所以在这种情况下,我们只会有延迟,没有任何损坏/丢失。

    使用setUncaughtExceptionHandler,您将无法像使用ProductionExceptionHandler 那样更改默认行为,使用它您只能记录错误或将消息发送到失败主题。


    更新自kafka-streams2.8.0

    由于kafka-streams2.8.0,您可以自动替换失败的流线程(由未捕获的异常引起) 使用KafkaStreams 方法void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);StreamThreadExceptionResponse.REPLACE_THREAD。更多详情请关注Kafka Streams Specific Uncaught Exception Handler

    kafkaStreams.setUncaughtExceptionHandler(ex -> {
        log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
        return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
    });
    

    【讨论】:

    • 感谢 Vasiliy 的意见。这可能是一个基本问题。在Producer中抛出异常会如何导致流死?
    • 如果在处理消息期间发生异常(例如在.mapValues(..).filter(..) 中的逻辑期间),或者在向目标(接收器)主题生成消息期间(例如网络问题),您的流将死。您可以通过在处理器期间抛出任何异常来测试它。所有未捕获的异常都会导致 kafka 流死亡
    • 当然。会试试的。
    • 只是想补充一点,设置一个未捕获的异常处理程序只是为了通知你一个线程死了——回调是在事后发生的。正如@VasiliySarzhynskyi 指出的那样,如果发生异常,这取决于您想要做什么。
    • 如果在处理消息期间发生异常(例如在 .mapValues(..)、.filter(..) 中的逻辑期间),...> 好的,因此您必须引发运行时异常才能停止流,因为如果它只是一个异常,你将不得不尝试/捕获并返回一些东西?
    【解决方案2】:

    为了处理消费者端的异常,

    1) 您可以使用以下属性在生产者中添加默认异常处理程序。

    "default.deserialization.exception.handler" = "org.apache.kafka.streams.errors.LogAndContinueExceptionHandler";
    

    基本上apache提供了三个异常处理类

    1) LogAndContiuneExceptionHandler 可以作为

    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
               LogAndContinueExceptionHandler.class);
    

    2) LogAndFailExceptionHandler

    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
               LogAndFailExceptionHandler.class);
    

    3) LogAndSkipOnInvalidTimestamp

    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
               LogAndSkipOnInvalidTimestamp.class);
    

    对于自定义异常处理,

    1)你可以实现DeserializationExceptionHandler接口并重写handle()方法。

    2) 或者您可以扩展上述类。

    【讨论】:

    • 你能给我指出一个工作自定义反序列化异常处理程序代码的例子吗?
    【解决方案3】:

    setUncaughtExceptionHandler 无助于处理异常,它在流因某些未捕获的异常而终止后才起作用。

    Kafka 提供了几种处理异常的方法。一个简单的 try-catch{} 将有助于捕获处理器代码中的异常,但 kafka 反序列化异常(可能是由于数据问题)和生产异常(在与代理通信期间发生)需要 DeserializationExceptionHandler 和 ProductionExceptionHandler 分别。默认情况下,如果遇到这些情况,kafka 应用程序将失败。

    你可以在这个post上找到

    【讨论】:

      【解决方案4】:

      在 Spring 云流中,您可以使用以下配置自定义反序列化处理程序:

      • spring.cloud.stream.kafka.streams.binder.configuration.default.deserialization.exception.handler=your-package-name.CustomLogAndContinueExceptionHandler

      • CustomLogAndContinueExceptionHandler 扩展 LogAndContinueExceptionHandler 或实现 DeserializationExceptionHandler

      • CustomLogAndContinueExceptionHandler DeserializationHandlerResponse.CONTINUE 或 FAIL 取决于您的用例

      @Slf4j
      public class CustomLogAndContinueExceptionHandler extends LogAndContinueExceptionHandler {
      
          @Override
          public DeserializationHandlerResponse handle(ProcessorContext context, ConsumerRecord<byte[], byte[]> record,
                  Exception exception) {
      .... some business logic here ....
              log.error("Message failed: taskId: {}, topic: {}, partition: {}, offset: {}, , detailerror : {}",
                      context.taskId(), record.topic(), record.partition(), record.offset(), exception.getMessage());
              return DeserializationHandlerResponse.CONTINUE;
          }
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-09-18
        • 2020-05-27
        • 2019-02-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多