【问题标题】:Not clear about the meaning of auto.offset.reset and enable.auto.commit in Kafka不清楚Kafka中auto.offset.reset和enable.auto.commit的含义
【发布时间】:2017-12-09 05:00:12
【问题描述】:

我是Kafka新手,不太了解Kafka配置的含义,谁能给我解释的更清楚!

这是我的代码:

 val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "master:9092,slave1:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "GROUP_2017",
  "auto.offset.reset" -> "latest", //earliest or latest
  "enable.auto.commit" -> (true: java.lang.Boolean)
)

在我的代码中是什么意思?

【问题讨论】:

  • 我建议阅读与您正在读取和/或写入的 Kafka 版本相关的 Kafka 文档:Here 是官方文档。消费者和生产者配置部分有参数定义。
  • 更好的问题和答案:stackoverflow.com/questions/32390265/…

标签: apache-kafka kafka-consumer-api


【解决方案1】:

我会向你解释含义,但我强烈建议阅读Kafka Web Site Configuration

"bootstrap.servers" -> "master:9092,slave1:9092"

本质上是 Kafka 集群配置:IP 和端口。

 "key.deserializer" -> classOf[StringDeserializer]
 "value.deserializer" -> classOf[StringDeserializer]

This SO回答说明目的是什么。

"group.id" -> "GROUP_2017"

一个消费者进程将属于一个 groupId。一个 groupId 可以有多个 Consumer,Kafka 只会将一个 Consumer 进程分配给一个 Partition(用于数据消费)。如果消费者的数量大于可用的分区,那么一些进程将处于空闲状态。

"enable.auto.commit" -> (true: java.lang.Boolean)

无论该标志是否为真,然后 Kafka 能够使用 Zookeeper 提交您从 Kafka 带来的消息,以保持它读取的最后一个“偏移量”。当您想要为生产系统提供更强大的解决方案时,这种方法不是最好的选择,因为不能确保您带来的记录得到正确处理(使用您在代码中编写的逻辑)。如果此标志为假,Kafka 将不知道上次读取的偏移量,因此当您重新启动进程时,它将开始读取“最早”或“最新”偏移量,具体取决于您的下一个标志(auto.offset.重置)。最后,This Cloudera article 详细解释了如何正确管理偏移量。

"auto.offset.reset" -> "latest"

这个标志告诉 Kafka 在你还没有任何“提交”的情况下从哪里开始读取偏移量。换句话说,如果您尚未在 Zookeeper 中保留任何偏移量(手动或使用 enable.auto.commit 标志),它将从“最早”或“最新”开始。

【讨论】:

  • “auto.offset.reset”->“最大”
  • largest 适用于老年消费者,latest 适用于新消费者。
  • 最小/最早是如何定义的?最小的是什么?
  • @user1491636 消费者启动或重启后将读取的消息偏移量。看到这个:stackoverflow.com/questions/48320672/…
  • @ShalabhNegi 我当时为现在处于生产中的应用程序所做的是使用 Apache Zookeeper 来跟踪消费者的偏移量。
【解决方案2】:

添加标题中提到的配置的更多细节:“不清楚auto.offset.resetenable.auto.commit在Kafka中的含义”

auto.offset.reset

使用auto.offset.reset 配置,您可以在您的消费者组从未从特定主题消费和提交或从该消费者组上次提交的偏移量为已删除(例如通过清理策略)。

Kafka 主题分区中的每条消息都有一个唯一标识符,即offset。每个 Kafka 分区的偏移量是唯一的。消费者通常会在其消费的主题的每个分区上提交偏移量。这样,消费者就可以避免重复读数。

假设您有一个消费者第一次阅读某个主题(或者如果您更改了消费者组名称)。因此,消费者组从未提交任何偏移量。根据Config Docs,您可以选择配置auto.offset.reset的以下行为:

  • 最早的:自动将偏移量重置为最早的偏移量

  • 最新的:自动将偏移量重置为最新的偏移量

  • none:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常

  • 其他任何事情: 向消费者抛出异常。

默认设置为latest

enable.auto.commit

如上所述,在使用来自 Kafka 的消息时,考虑您的偏移量及其提交至关重要。当将配置 enable.auto.commit 设置为 true 时,消费者偏移量将在后台自动提交。

JavaDocs of KafkaConsumer 中,您将找到一个很好的示例,说明如何使用消费者客户端手动提交偏移量

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.commitSync();

为了再次强调消费者客户端中偏移管理的重要性,值得阅读整个 Java 文档描述或confluent Kafka Documentation on Offset Management

【讨论】:

    【解决方案3】:

    auto.offset.reset 仅在没有有效的提交偏移量时起作用;例如在您第一次启动系统时,或者在提交的偏移量到期并因为它太旧而被删除之后。

    enable.auto.commit 是关于在后台自动提交偏移量与在前台显式手动控制的选择。

    auto.offset.reset
    

    当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办(例如,因为该数据已被删除):

    • earliest:自动将偏移量重置为最早的偏移量
    • latest:自动将偏移量重置为最新的偏移量
    • none:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常
    • 其他:向消费者抛出异常。
    Type:
    string
    Default:
    latest
    Valid Values:
    [latest, earliest, none]
    Importance:
    medium
    
    enable.auto.commit
    

    如果为真,消费者的偏移量将在后台定期提交。

    Type:
    boolean
    Default:
    true
    Valid Values:
    
    Importance:
    medium
    
    auto.commit.interval.ms
    

    如果 enable.auto.commit 设置为 true,消费者偏移量自动提交到 Kafka 的频率(以毫秒为单位)。

    Type:
    int
    Default:
    5000 (5 seconds)
    Valid Values:
    [0,...]
    Importance:
    low
    

    完整的消费者配置参数集记录在 Apache Kafka 网站https://kafka.apache.org/documentation.html#newconsumerconfigs

    【讨论】:

    • 虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接答案可能会失效。 - From Review
    • 良好的反馈。用有用的上下文和适当的文档修复了答案,如果链接更改也不会过期
    猜你喜欢
    • 2023-03-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-01-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多