【发布时间】:2019-04-29 16:54:57
【问题描述】:
我目前正在 Dataproc 上运行 spark 作业,但在尝试重新加入组并从 kafka 主题读取数据时遇到错误。我已经进行了一些挖掘,但不确定问题是什么。我将auto.offset.reset 设置为earliest,所以它应该从最早可用的未提交偏移量读取,最初我的火花日志如下所示:
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer
clientId=consumer-1, groupId=demo-group] Resetting offset for
partition demo.topic-7 to offset 5555542.```
但是下一行我在尝试从服务器上不存在的偏移量读取时遇到错误(您可以看到分区的偏移量与上面列出的不同,所以我不知道它为什么会尝试要读取该偏移量,这是下一行的错误:
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets
out of range with no configured reset policy for partitions:
{demo.topic-11=4544296}
关于为什么我的 spark 作业不断回到这个偏移量 (4544296),而不是它最初输出的那个 (5553330),有什么想法吗?
它似乎自相矛盾 w a) 它说的实际偏移量和它尝试读取的偏移量 b) 说没有配置重置策略
【问题讨论】:
-
使用结构化流还是 Dstream?
-
@cricket_007 dstreams
-
根据文档,属性值应该设置为
smallest,而不是earliest,而且如果有一段时间没有启动消费者,那么消费者组将过期偏移量,导致应用程序“重置”为不同的值......同时,例如,Spark 可能正在尝试从检查点恢复,并且偏移量也存储在那里(或其他地方,如果你已经这样配置的话) -
@cricket_007 "smallest" 和 "largest" 是旧消费者配置的属性值,我正在使用新的消费者配置,它在几个 spark 版本之前更新了 kafka.apache.org/documentation.html#newconsumerconfigs 除 "earliest" 之外的任何其他值" 和 "latest" 会抛出消费者配置错误
-
抱歉,正在阅读
streaming-0-8文档。你好吗storing offsets?
标签: apache-spark apache-kafka streaming google-cloud-dataproc