【问题标题】:Unable to catch RecordTooLargeException from spring cloud stream kafka无法从 Spring Cloud Stream kafka 中捕获 RecordTooLargeException
【发布时间】:2023-04-05 08:09:02
【问题描述】:

我正在使用 Spring cloud stream kafka 作为活页夹。当我的消息太大时,我收到错误

  ERROR o.s.k.s.LoggingProducerListener - Exception thrown when sending a message with key='null' and payload='{123, 34, 105, 100, 34, 58, 34, 115, 105, 110, 103, 97, 112, 111, 114, 101, 104, 101, 114, 97, 108, ...' to topic page:
    org.apache.kafka.common.errors.RecordTooLargeException: The message is 4711755 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

这是我下面用于发送消息的springboot代码

 private BinderAwareChannelResolver resolver;

    boolean isSent = this.resolver.resolveDestination(this.topic)
                    .send(message);

由于我收到错误,我应该能够在我的 springboot 代码中捕获 RecordTooLargeException。但是,它没有被捕获并且代码继续。 isSent 也返回为“true”。它不应该返回为假吗?我怎样才能捕捉到这个错误并处理它?谢谢

【问题讨论】:

    标签: spring-boot apache-kafka spring-cloud


    【解决方案1】:

    如果错误是内部错误并且由不受您的应用程序代码直接控制的任何线程引发,您可能需要使用UncaughtExceptionHandler

    Thread.setDefaultUncaughtExceptionHandler((whichThread, whatException) -> {
                if (whatException.getClass()
                        .equals(org.apache.kafka.common.errors.RecordTooLargeException.class) ||
                        whatException.getCause().getClass()
                                .equals(org.apache.kafka.common.errors.RecordTooLargeException.class)) {
                    // do something
                }
                else if (or) else {
                // others
                }
            });
    

    但是上面的代码只是为了让你了解异常。 如果您收到大于大小的记录,您应该在代理和主题上进行更改。

    max.message.bytes (topic)
    message.max.bytes (broker)
    

    P.S:您可能还想使用instanceofisAssignableFrom() 而不是getClass().equals()

    更新:

    我在KafkaProducer中遇到了同样的错误,在Callback中返回了异常

    producer.send(new ProducerRecord<>(topic, key, value), (recordMetadata, exception) -> {
        if (exception instanceof RecordTooLargeException) {
              // handling code
        }
    });
    

    你还可以看到实现Spring kafka asynchronous callback

    【讨论】:

      猜你喜欢
      • 2020-08-22
      • 2018-04-28
      • 2017-11-24
      • 2018-08-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多