【问题标题】:How does an offset expire for an Apache Kafka consumer group?Apache Kafka 消费者组的偏移量如何过期?
【发布时间】:2017-01-01 01:41:19
【问题描述】:

当我注意到一些奇怪的行为时,我正在对一个旧主题进行一些测试。阅读 Kafka 的日志,我注意到这条“删除了 8 个过期的偏移量”消息:

[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
Deleting segment 0 from log __consumer_offsets-31. (kafka.log.Log)
Deleting segment 0 from log __consumer_offsets-45. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-45/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-31/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-13. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-13/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-11. (kafka.log.Log)
Deleting segment 4885 from log __consumer_offsets-11. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000004885.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-11/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-26. (kafka.log.Log)
Deleting segment 12406 from log __consumer_offsets-26. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000012406.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-26/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-22. (kafka.log.Log)
Deleting segment 8643 from log __consumer_offsets-22. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000008643.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-22/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-6. (kafka.log.Log)
Deleting segment 9757 from log __consumer_offsets-6. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-6/00000000000000009757.index.deleted (kafka.log.OffsetIndex)
Deleting segment 0 from log __consumer_offsets-14. (kafka.log.Log)
Deleting segment 1 from log __consumer_offsets-14. (kafka.log.Log)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000001.index.deleted (kafka.log.OffsetIndex)
Deleting index /data/kafka-logs/__consumer_offsets-14/00000000000000000000.index.deleted (kafka.log.OffsetIndex)
[GroupCoordinator 1001]: Preparing to restabilize group GROUP_NAME with old generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 38 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 38 (kafka.coordinator.GroupCoordinator)
[Group Metadata Manager on Broker 1001]: Removed 8 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

其实我有两个问题:

  1. 这种抵消过期对消费者组有何作用?

  2. 这个过期的偏移量能否解释这种行为,即我的消费者在拥有auto.offset.reset = latest 时不会轮询任何内容,但它在拥有auto.offset.reset = earliest 时从上次提交的偏移量进行轮询?

【问题讨论】:

    标签: apache-kafka


    【解决方案1】:

    更新

    从 Apache Kafka 2.1 开始,只要消费者组处于活动状态,偏移量就不会被删除,消费者是否提交偏移量是独立的,即 offset.retention.minutes 时钟仅在组变为空时开始计时(在较旧的版本,时钟在提交发生时直接开始计时)。

    参照。 https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets

    原答案

    Kafka,默认情况下会在可配置的时间段后删除已提交的偏移量。见参数offsets.retention.minutes。即,如果消费者组在这段时间内处于非活动状态(即不提交任何偏移量),则偏移量将被删除。因此,即使消费者正在运行,如果它不提交某些分区的偏移量,这些偏移量也会受到offset.retention.minutes 的约束。

    如果你启动一个消费者,会发生以下情况:

    1. 寻找一个(有效的)提交的偏移量(对于消费者组)
      1. 如果找到有效的偏移量,则从那里恢复
      2. 如果没有找到有效的偏移量,根据auto.offset.reset参数重置偏移量

    因此,如果您的偏移量被删除并且auto.offset.reset = latest,您的消费者将不会轮询任何内容,直到将新数据添加到主题中。如果auto.offset.reset = earliest 应该会占用整个主题。

    请参阅此 JIRA 以了解有关此 https://issues.apache.org/jira/browse/KAFKA-3806https://issues.apache.org/jira/browse/KAFKA-4682 的讨论

    【讨论】:

    • 谢谢。那么,即使有一个活跃的消费者,如果在此保留时间内没有任何新的偏移量提交,偏移量也会被删除?
    • 只要组处于活动状态,就不应删除偏移量。
    • 这不一定是真的。如果您设置 enable.auto.commit=false 并且没有新数据(无提交) - 提交将过期。
    • 目前这会影响 Kafka Streams,它设置 enable.auto.commit=false 并具有 auto.offset.reset=earliest。默认情况下,如果 Kafka Streams 应用程序在 24 小时内未处理数据,然后重新启动,则会删除其偏移量并从头开始重新处理数据。
    • @DmitryMinkovsky 是的。那是正确的。有对应的JIRA:issues.apache.org/jira/browse/KAFKA-5510
    【解决方案2】:

    检查我的答案here。你不应该忘记文件滚动。它会影响偏移文件的删除。

    【讨论】:

      猜你喜欢
      • 2019-05-01
      • 2017-07-22
      • 1970-01-01
      • 1970-01-01
      • 2019-04-04
      • 2018-12-17
      • 2020-05-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多