【问题标题】:Guidelines to handle Timeout exception for Kafka Producer?Kafka Producer 超时异常处理指南?
【发布时间】:2019-07-13 19:18:13
【问题描述】:

在我的 Kafka 制作人中,由于各种原因,我经常遇到 Timeout 异常。我目前正在使用生产者配置的所有默认值。

我看到了以下超时异常:

org.apache.kafka.common.errors.TimeoutException:更新失败 60000 毫秒后的元数据。

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) 对于 topic-1-0:自上次追加以来已过去 30001 毫秒

我有以下问题:

  1. 这些超时异常的一般原因是什么?

    1. 临时网络问题
    2. 服务器问题?如果是,那么什么样的服务器问题?
  2. 处理超时异常的一般准则是什么?

    1. 设置“重试”配置,以便 Kafka API 进行重试?
    2. 增加“request.timeout.ms”或“max.block.ms”?
    3. 捕获异常并让应用层重试发送消息,但这对于异步发送似乎很难,因为消息会乱序发送?
  3. 超时异常是可重试的异常吗?重试它们是否安全?

我正在使用 Kafka v2.1.0 和 Java 11。

提前致谢。

【问题讨论】:

  • 你能解决这个问题吗?我也看到第二个问题非常断断续续,比如 2 个月左右一次,但下面的所有解决方案都指向不会断断续续的问题。如果您有防火墙阻止或 DNS 损坏,那么您将永久看到该问题。
  • 在 kafka API 中启用调试日志记录后,我能够得到 TimeoutException 的原因。
  • 你的情况是什么?
  • 一年多了,不记得异常原因了。

标签: java apache-kafka kafka-producer-api timeoutexception


【解决方案1】:

生产者和代理的默认 Kafka 配置值足够保守,在一般情况下,您不应该遇到任何超时。这些问题通常指向生产者和经纪人之间的不稳定/有损网络。

您得到的异常Failed to update metadata,通常意味着生产者无法访问其中一个代理,其结果是它无法获取元数据。

对于您的第二个问题,Kafka 将自动重试发送未得到代理完全确认的消息。如果您想在应用程序端超时时捕获并重试,这取决于您,但如果您遇到 1 分钟以上的超时,重试可能不会有太大的不同。无论如何,您将不得不找出代理的潜在网络/可达性问题。

根据我的经验,网络问题通常是:

  • 端口 9092 被防火墙阻止,无论是在生产者端还是在代理端,或者在中间的某个地方(从运行生产者的服务器尝试nc -z broker-ip 9092
  • DNS 解析被破坏,因此即使端口打开,生产者也无法解析到 IP 地址。

【讨论】:

  • 要让 kafka 自动重试发送消息,我必须将 'retries' 配置设置为大于 0 对吗?默认值为0,这是否意味着kafka默认不重试?
  • @xabhi 在 Kafka 2.1.0 中的重试次数实际上设置为 2+十亿。这个默认值从 1.x 版本改变,实际上它是零。查看文档:kafka.apache.org/documentation.html
【解决方案2】:

“这些超时异常的一般原因是什么?”

  1. 我之前看到的最常见的原因是元数据信息过时:一个代理出现故障,并且该代理上的主题分区故障转移到其他代理。但是,主题元数据信息没有正确更新,客户端仍然尝试与失败的代理对话以获取元数据信息或发布消息。这会导致超时异常。

  2. 网络连接问题。这可以通过telnet broker_host borker_port 轻松诊断

  3. 代理过载。如果代理负载过高或托管过多主题分区,则可能会发生这种情况。

处理超时异常,一般做法是:

  1. 排除代理方问题。确保主题分区完全复制,并且代理没有过载

  2. 修复主机名解析或网络连接问题(如果有)

  3. 调整request.timeout.msdelivery.timeout.ms等参数。我过去的经验是默认值在大多数情况下都可以正常工作。

【讨论】:

  • 你可能也想tune max.block.ms 制作人,它的默认值是 60 秒,在这段时间内可以给你一杯咖啡 ;-)
  • 感谢你,我终于找到了这个属性,之前的那些对我不起作用,在 12 小时后部署了一个旧的微服务。
【解决方案3】:

如果生产者或消费者无法访问“advertised.listeners”(protocol://host:port) 的值,则会发生超时异常

通过以下命令检查属性“advertised.listeners”的配置:

cat $KAFKA_HOME/config/server.properties

【讨论】:

    【解决方案4】:

    我建议在构建 Producer config 时使用以下属性

    需要来自分区 - 领导者的确认

    kafka.acks=1

    kafka 生产者发送消息和接收来自领导者的确认的最大重试次数

    kafka.retries=3

    每个单独请求的请求超时

    timeout.ms=200

    等待再次发送下一个请求;这是为了避免在紧密循环中发送请求;

    retry.backoff.ms=50

    上限完成所有重试

    dataLogger.kafka.delivery.timeout.ms=1200

    producer.send(record, new Callback {
      override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
        if (e != null) {
          logger.debug(s"KafkaLogger : Message Sent $record to  Topic  ${recordMetadata.topic()}, Partition ${recordMetadata.partition()} , Offset ${recordMetadata.offset()} ")
        } else {
          logger.error(s"Exception while sending message $item to Error topic :$e")
        }
      }
    })
    

    超时关闭生产者

    producer.close(1000, TimeUnit.MILLISECONDS)

    【讨论】:

      猜你喜欢
      • 2018-04-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-10-04
      • 2018-08-22
      • 2020-04-05
      • 2019-11-26
      相关资源
      最近更新 更多