【问题标题】:Kafka Broker offsets/log retention and consumers offset reset in earliest modeKafka Broker 偏移量/日志保留和消费者偏移量在最早模式下重置
【发布时间】:2018-12-20 19:25:34
【问题描述】:

问题描述:

我们的 Kafka 消费者(在 Spring Boot 2.x 中开发)正在执行几天。 当我们重新启动这些消费者时,该主题的所有消息都会再次被消费,但仅在特定条件下。

条件:

我们假设组合代理/主题配置 (log.retention.*, offsets.retention.*) 和消费者配置 (auto.offset. reset = 最早)导致了这种行为。
显然我们不能将consumer设置为"latest",因为如果consumer停止了,有新消息到来,当consumer再次启动时,这些消息就不会被消费了。

问题:

避免这种情况的正确设置是什么?
在上一个 Kafka Broker 版本 (2.x) 中,log.retention.* 和 offsets.retention.* 的默认值是相同的 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-186%3A+Increase+offsets+retention+default+to+7+days)

这个新的配置设置能解决问题吗?

消费者配置auto.commit委托于 Spring Cloud Stream Framework):

           auto.commit.interval.ms = 100
           auto.offset.reset = earliest
           bootstrap.servers = [server1:9092]
           check.crcs = true
           client.id = 
           connections.max.idle.ms = 540000
           enable.auto.commit = false
           exclude.internal.topics = true
           fetch.max.bytes = 52428800
           fetch.max.wait.ms = 500
           fetch.min.bytes = 1
           group.id = consumer_group1
           heartbeat.interval.ms = 3000
           interceptor.classes = null
           internal.leave.group.on.close = true
           isolation.level = read_uncommitted
           key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
           max.partition.fetch.bytes = 1048576
           max.poll.interval.ms = 300000
           max.poll.records = 500
           metadata.max.age.ms = 300000
           metrics.recording.level = INFO
           metrics.sample.window.ms = 30000
           partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
           receive.buffer.bytes = 65536
           reconnect.backoff.max.ms = 1000
           reconnect.backoff.ms = 50
           request.timeout.ms = 305000
           retry.backoff.ms = 100
           value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

代理配置:

           log.retention.ms = 86400000
           log.retention.minutes = 10080
           log.retention.hours = 168
           log.retention.bytes = -1

           offsets.retention.ms = 864000000
           offsets.retention.minutes = 14400
           offsets.retention.hours = 240 

           unclean.leader.election.enable = false
           log.cleaner.enable = true
           auto.leader.rebalance.enable = true
           leader.imbalance.check.interval.seconds = 300
           log.retention.check.interval.ms = 300000
           log.cleaner.delete.retention.ms = 604800000

感谢和问候

【问题讨论】:

标签: java spring-boot apache-kafka spring-cloud-stream


【解决方案1】:

您是对的,您遇到此问题是因为 2.0 之前的 Kafka 版本的 log.retention.*offsets.retention.*(分别为 7 天和 1 天)的值不同,请检查 description here。 这是由于很少有消息进入您的主题,并且偏移量数据已经过期。

您的短语Obviously we can't set consumer to "latest" 并不完全正确。 如果您在不到 1 天前(例如几个小时前)收到最后一条消息,您可以安全地将 auto.offset.reset 值更新为 latest,并使用相同的组 ID(或 application.id)。在这种情况下,您不会丢失消息。

作为另一种选择,您可以将特定主题的日志保留时间更改为 1 天。 您也可以更新值offsets.retention.*,但您需要从性能角度对其进行测试,它可能会降级。

【讨论】:

  • 好的,所以如果我理解得很好,例如设置offsets.retention.minutes = 10080log.retention.minutes = 10080(应该相同),将auto.offset.reset 的值设置为earliest 时将创建新主题这个问题应该解决吧?此值适用于整个主题还是每个分区?
  • 配置 offsets.retention.minuteslog.retention.minutes 应用于集群中的所有 Kafka 主题。您可以通过指定retention.ms 来覆盖特定主题的日志保留(它将应用于此主题中的每个分区)。在主要情况下将offsets.retention.minuteslog.retention.minutes 设置为相同的值会解决您的问题,但是,您遇到它的可能性仍然很小,因为Kafka 中的消息不会在到期后立即删除@987654338 @时间。
  • 完美,但我有非常敏感的数据,我不应该允许从一开始就对其进行重新处理。 Kafka经纪人/消费者配置是否可以保证这种情况永远不会发生,或者在任何配置组合中都存在重新处理earliestlatest丢失一些消息的所有数据的可能性很小?还有其他想法来实现它吗?谢谢!
  • 在哪里可以更改“log.retention.hours”的配置?我正在使用 Spring Boot 来消费 Kafka 中的主题,并且 Kafka 正在通过 docker-compose 运行。我应该在 docker-compose 上更改 Zookeeper 配置中的任何内容吗?我可以更改什么以及在哪里更改?我需要所有消息在消费和确认后立即消失(手动)。
【解决方案2】:

如果您保持应用程序 24x7 全天候运行(例如,在没有数据的周末),一种选择是设置 idleInterval 并添加 ApplicationListener(或 @EventListener)来监听 ListenerContainerIdleEvent秒。

然后,如果idleTime 属性接近您的日志保留期,您可以在事件中使用Consumer 重新提交偏移量 - 获取分配的分区,找到它们当前的position(),然后重新提交。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-08-09
    • 2018-02-03
    • 2019-01-19
    • 1970-01-01
    • 1970-01-01
    • 2019-05-01
    • 2018-12-17
    相关资源
    最近更新 更多