【问题标题】:Spark set to read from earliest offset - throws error on attempting to consumer an offset no longer available on KafkaSpark 设置为从最早的偏移量读取 - 尝试使用 Kafka 上不再可用的偏移量时抛出错误
【发布时间】:2019-04-29 16:54:57
【问题描述】:

我目前正在 Dataproc 上运行 spark 作业,但在尝试重新加入组并从 kafka 主题读取数据时遇到错误。我已经进行了一些挖掘,但不确定问题是什么。我将auto.offset.reset 设置为earliest,所以它应该从最早可用的未提交偏移量读取,最初我的火花日志如下所示:

19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-7 to offset 5555542.```

但是下一行我在尝试从服务器上不存在的偏移量读取时遇到错误(您可以看到分区的偏移量与上面列出的不同,所以我不知道它为什么会尝试要读取该偏移量,这是下一行的错误:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
out of range with no configured reset policy for partitions: 
{demo.topic-11=4544296}

关于为什么我的 spark 作业不断回到这个偏移量 (4544296),而不是它最初输出的那个 (5553330),有什么想法吗?

它似乎自相矛盾 w a) 它说的实际偏移量和它尝试读取的偏移量 b) 说没有配置重置策略

【问题讨论】:

  • 使用结构化流还是 Dstream?
  • @cricket_007 dstreams
  • 根据文档,属性值应该设置为smallest,而不是earliest,而且如果有一段时间没有启动消费者,那么消费者组将过期偏移量,导致应用程序“重置”为不同的值......同时,例如,Spark 可能正在尝试从检查点恢复,并且偏移量也存储在那里(或其他地方,如果你已经这样配置的话)
  • @cricket_007 "smallest" 和 "largest" 是旧消费者配置的属性值,我正在使用新的消费者配置,它在几个 spark 版本之前更新了 kafka.apache.org/documentation.html#newconsumerconfigs 除 "earliest" 之外的任何其他值" 和 "latest" 会抛出消费者配置错误
  • 抱歉,正在阅读 streaming-0-8 文档。你好吗storing offsets

标签: apache-spark apache-kafka streaming google-cloud-dataproc


【解决方案1】:

这个答案迟了一年,但希望能帮助其他面临类似问题的人。

通常,当消费者尝试读取不再存在的 Kafka 主题中的偏移量时,会显示此行为。偏移量不再存在,通常是因为它已被 Kafka Cleaner 删除(例如,由于保留或压缩策略)。但是,Kafka 仍然知道 Consumer Group,并且 Kafka 保留了主题“demo.topic”及其所有分区的组“demo-group”的最新消费消息的信息。

因此,auto.offset.reset 配置没有任何影响,因为不需要重置。相反,Kafka 了解 Consumer Group。

此外,Fetcher 仅告诉您主题的每个分区内的最新可用偏移量。它确实 not 自动意味着它实际上轮询了所有消息直到这个偏移量。 Spark 决定它实际消耗和处理每个分区的消息数量(例如基于配置 maxRatePerPartition)。

要解决此问题,您可以更改消费者组(在这种特殊情况下这可能不是您想要的)或使用手动重置消费者组“演示组”的偏移量

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group demo-group --topic demo.topic --partition 11 --to-latest

根据您的要求,您可以使用该工具重置主题的每个分区的偏移量。帮助功能或文档解释了所有可用的选项。

【讨论】:

    猜你喜欢
    • 2021-05-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-13
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多