【问题标题】:Kafka Structured streaming application throwing IllegalStateException when there is a gap in the offset当偏移量存在间隙时,Kafka 结构化流应用程序抛出 IllegalStateException
【发布时间】:2021-09-11 04:34:41
【问题描述】:

我有一个使用 Kafka on spark 2.3 运行的结构化流应用程序,

“spark-sql-kafka-0-10_2.11”版本是2.3.0

应用程序开始读取消息并成功处理,然后在达到特定偏移量后(如异常消息所示),抛出以下异常:

java.lang.IllegalStateException: Tried to fetch 666 but the returned record offset was 665
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:297)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
        at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:151)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:142)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

它总是在相同的偏移量上失败,看起来这是由于偏移量的间隙,因为我在 Kafka UI 中看到偏移量 665 之后有 667(由于某种原因它跳过了 666),而 Kafka 客户端在我的结构化流应用程序尝试获取 666 并失败。

在深入研究 Spark 的代码后,我发现他们没想到会发生这种异常(根据评论):

https://github.com/apache/spark/blob/branch-2.3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L297

所以我想知道,我做错了什么吗? 或者这是我使用的特定版本的错误?

【问题讨论】:

  • 看起来您链接到的代码试图获取偏移量 666,但它得到了 665 - 较低的,而不是较高的。你能展示一下 kafka 消费者是如何配置的吗?
  • 另外:你有 DEBUG 日志级别吗?你能在错误之前看到类似“Seeking to $groupId $topicPartition $offset”的日志吗?
  • Kafka消费者配置如下:("kafka.sasl.jaas.config", "org.apache.kafka.common.security.plain.P ..."), ("kafka .bootstrap.servers", "example"), ("startingOffsets", "earliest"), ("subscribe", "the-topic-name"), ("kafka.security.protocol", "SASL_SSL"), ( "maxOffsetsPerTrigger", "100"), ("kafka.max.partition.fetch.bytes", "2147483647", ("kafka.fetch.message.max.bytes", "2147483647", ("kafka.max.partition .fetch.bytes", "2147483647", ("failOnDataLoss", "false")
  • 关于:“寻找 $groupId $topicPartition $offset” 我启用了 DEBUG 级别,但我没有看到任何“寻找”日志

标签: scala apache-spark apache-kafka spark-structured-streaming


【解决方案1】:

有一个长期存在的issue in Spark 已在 Spark 2.4 中修复,这在 Kafka 和 Spark 之间造成了一些阻抗不匹配。部分修复已向后移植到 Spark 2.3.1,但仅在配置选项 spark.streaming.kafka.allowNonConsecutiveOffsets 设置为 true 时启用;正如您所观察到的,您很可能遇到了未向后移植的内容,在这种情况下升级到 Spark 2.4 可能值得考虑。

【讨论】:

  • 你确定你在谈论“结构化流”吗?因为我认为这个错误在“Spark Streaming”中。
  • 结构化流在底层使用常规流(参见堆栈跟踪中的KafkaSourceRDD)...您尝试过该配置选项吗?
  • 我在查询之前添加了这个: sparkSession.conf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true") ,但没有帮助。其实我看到了这个bug,看起来很相关,还没有被反向移植:issues.apache.org/jira/browse/SPARK-25005
猜你喜欢
  • 2017-09-25
  • 2018-09-28
  • 1970-01-01
  • 2019-10-03
  • 1970-01-01
  • 2021-06-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多