【问题标题】:Spark Structured Streaming NOT process Kafka offset expiresSpark Structured Streaming NOT 处理 Kafka 偏移量过期
【发布时间】:2020-09-03 11:49:16
【问题描述】:

我们有 Spark 结构化流应用程序,可将数据从 Kafka 推送到 S3。
Spark Job 可以正常运行几天,然后开始累积延迟。我们有 Kafka 主题,有效期为 6 小时。如果延迟增加并且某些偏移量开始过期,则 Spark 无法找到偏移量并在警告后开始记录。从表面上看,Spark 作业似乎正在运行,但它没有处理任何数据。当我尝试手动重新启动系统时,我遇到了 GC 问题(见下面的屏幕截图)。我已将“failOnDataLoss”设置为“false”。我们希望系统在找不到偏移量时不停止处理。除了下面提到的警告之外,我在日志中没有看到任何错误。

我们看到的唯一警告是:

The current available offset range is AvailableOffsetRange(34066048,34444327).
 Offset 34005119 is out of range, and records in [34005119, 34006993) will be
 skipped (GroupId: spark-kafka-source-6b17001a-01ff-4c10-8877-7677cdbbecfc--1295174908-executor, TopicPartition: DataPipelineCopy-46). 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true".
    
        
20/05/17 17:16:30 INFO Fetcher: [Consumer clientId=consumer-7, groupId=spark-kafka-source-6b17001a-01ff-4c10-8877-7677cdbbecfc--1295174908-executor] Resetting offset for partition DataPipelineCopy-1 to offset 34444906.
20/05/17 17:16:30 WARN InternalKafkaConsumer: Some data may be lost. Recovering from the earliest offset: 34068782
20/05/17 17:16:30 WARN InternalKafkaConsumer: 
The current available offset range is AvailableOffsetRange(34068782,34444906).
 Offset 34005698 is out of range, and records in [34005698, 34007572) will be
 skipped (GroupId: spark-kafka-source-6b17001a-01ff-4c10-8877-7677cdbbecfc--1295174908-executor, TopicPartition: DataPipelineCopy-1). 
Some data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true".

    ome data may have been lost because they are not available in Kafka any more; either the
 data was aged out by Kafka or the topic may have been deleted before all the data in the
 topic was processed. If you want your streaming query to fail on such cases, set the source
 option "failOnDataLoss" to "true".
    
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {DataPipelineCopy-1=34005698}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:970)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:490)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.fetchData(KafkaDataConsumer.scala:470)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchRecord(KafkaDataConsumer.scala:361)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:251)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
    at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
    at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:64)
    at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:500)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.next(KafkaMicroBatchReader.scala:357)
    at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
    at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
    at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/05/17 17:16:30 WARN ConsumerConfig: The configuration 'consumer.commit.groupid' was supplied but isn't a known config.
20/05/17 17:16:30 INFO AppInfoParser: Kafka version : 2.0.0

在上述故障之前,系统似乎工作正常,但没有处理来自 KAFKA 的任何新数据。

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    “当前可用的偏移量范围是AvailableOffsetRange(34066048,34444327)。偏移量34005119超出范围”

    您的程序处理记录的速度似乎比您预期的要慢得多。导致来自kafka的日志由于保留而被删除可能是。你能检查一下你设置的保留时间是多少。您的工作正在寻找的偏移量肯定小于可用的偏移量范围。

    【讨论】:

    • 同意系统运行缓慢。但不明白为什么系统还活着但不处理任何数据。当我尝试手动重新启动系统时,我开始收到上面提到的 GC 错误。只有这样我才能通过直接删除检查点并重新启动系统来继续处理。这对于生产来说不是一个好的解决方案。
    • 我也面临同样的问题。请发布您找到的解决方案
    【解决方案2】:

    在您的应用程序(kafka comsumer)处理它们之前,这些记录似乎已被标记为“不可见”。正如提到的What determines Kafka consumer offset?

    我的解决方案: 1.创建一个新的消费者组并重新启动您的应用程序。(您的kafka confsumer偏移策略首先设置为最早) 2.如果 step1 不起作用,请增加 kafka 日志保留窗口(kafka 代理参数: log.retention.hours 或 log.retention.ms 或 log.cleaner.delete.retentions.ms ,这取决于您的产品环境)。

    第 2 步对我来说很好用。

    【讨论】:

      猜你喜欢
      • 2021-05-22
      • 2017-02-06
      • 2019-07-30
      • 2017-12-31
      • 1970-01-01
      • 2018-04-26
      • 2018-09-22
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多