【问题标题】:Spark Streaming IllegalStateException: This consumer has already been closedSpark Streaming IllegalStateException:此消费者已关闭
【发布时间】:2017-10-30 16:28:16
【问题描述】:

所以使用: - Spark 结构化流 (2.1.0) - 卡夫卡 0.10.2.0 - 斯卡拉 2.11

我正在使用 Kafka 的默认 API,所以基本上:

val df = spark.readStream
  .format("kafka")
  .option(...)

设置选项(通过 SSL)和一切。然后我明显地应用了一些动作等并启动流等(它运行正常)。但是它有时会抛出异常:

17/05/30 11:05:23 WARN TaskSetManager: Lost task 23.0 in stage 77.0 (TID 3329, spark-worker-3, executor 0): java.lang.IllegalStateException: This consumer has already been closed.
at org.apache.kafka.clients.consumer.KafkaConsumer.ensureNotClosed(KafkaConsumer.java:1611)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1622)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:278)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(CachedKafkaConsumer.scala:177)
at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:89)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:147)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:136)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

任何提示为什么会失败?

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming kafka-consumer-api spark-structured-streaming


    【解决方案1】:

    https://issues.apache.org/jira/browse/SPARK-18682 在实现批量 Kafka 源时修复了它。您应该无法在 Spark 2.1.1 中看到它。如果您在 Spark 2.1.1 中仍然看到此错误,请在 https://issues.apache.org/jira/browse/SPARK 上创建 Spark 票证

    【讨论】:

    • 你能检查我在帖子中的其他消息吗?提前致谢
    • 你好,我还是2.2.0的。我打开了一个问题,它被“拒绝”了,你能帮帮我吗?对我来说有点阻塞 atm:issues.apache.org/jira/browse/SPARK-21453
    • 即使使用 3.0.2 也能看到。时。主要是在重试缓存的任务时。
    猜你喜欢
    • 2017-02-23
    • 2018-12-18
    • 2014-12-30
    • 1970-01-01
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-19
    相关资源
    最近更新 更多