【问题标题】:Spark Streaming Exception: java.util.NoSuchElementException: None.getSpark 流异常:java.util.NoSuchElementException:None.get
【发布时间】:2018-12-01 21:06:59
【问题描述】:

我正在通过将SparkStreaming 数据转换为数据帧将其写入 HDFS:

代码

object KafkaSparkHdfs {

  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
  sparkConf.set("spark.driver.allowMultipleContexts", "true");
  val sc = new SparkContext(sparkConf)

  def main(args: Array[String]): Unit = {
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "stream3",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("fridaydata")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
    )

    val lines = stream.map(consumerRecord => consumerRecord.value)
    val words = lines.flatMap(_.split(" "))
    val wordMap = words.map(word => (word, 1))
    val wordCount = wordMap.reduceByKey(_ + _)

    wordCount.foreachRDD(rdd => {
      val dataframe = rdd.toDF(); 
      dataframe.write
        .mode(SaveMode.Append)
        .save("hdfs://localhost:9000/newfile24")     
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

文件夹已创建,但文件未写入。

程序因以下错误而终止:

    18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.lang.Thread.run(Thread.java:748)
    18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

在我的 pom 中,我使用了各自的依赖项:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11

【问题讨论】:

    标签: apache-spark hadoop apache-kafka apache-spark-sql spark-streaming


    【解决方案1】:

    该错误是由于尝试同时运行多个 Spark 上下文。将 allowMultipleContexts 设置为 true 主要用于测试目的,不鼓励使用。因此,您的问题的解决方案是确保在任何地方都使用相同的SparkContext。从代码中我们可以看到SparkContext (sc) 用于创建SQLContext,这很好。但是,在创建 StreamingContext 时不使用它,而是使用 SparkConf

    通过查看documentation,我们看到:

    通过提供新 SparkContext 所需的配置来创建 StreamingContext

    换句话说,通过使用SparkConf 作为参数,将创建一个新的SparkContext。现在有两个不同的上下文。

    这里最简单的解决方案是继续使用与以前相同的上下文。将创建StreamingContext 的行更改为:

    val ssc = new StreamingContext(sc, Seconds(20))
    

    注意:在较新版本的 Spark (2.0+) 中,请改用 SparkSession。然后可以使用StreamingContext(spark.sparkContext, ...) 创建一个新的流上下文。它可以如下所示:

    val spark = SparkSession().builder
      .setMaster("local[*]")
      .setAppName("SparkKafka")
      .getOrCreate()
    
    import sqlContext.implicits._
    val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
    

    【讨论】:

    • 是的,它正在工作,但是当有新数据出现时,我使用了追加模式,它没有写入同一个文件,它正在创建新文件@Shaido
    • 是我们可以继续写入同一个文件的任何方式,因为如果流式传输每分钟发出一次,它就会继续创建 n 个文件。
    • @andani:检查是否可以这样做。 Spark 会将单独的分区保存为单独的文件,因此不确定是否可以完成。但是,您可以通过 spark.read.parquet("path") 读取所有文件。
    • @andani: 没问题,乐于助人:)
    【解决方案2】:

    这里有一个明显的问题 - coalesce(1)

    dataframe.coalesce(1)
    

    虽然在许多情况下减少文件数量可能很诱人,但当且仅当数据量足够低以供节点处理时才应该这样做(显然它不在这里)。

    另外,让我引用the documentation

    但是,如果您要进行剧烈的合并,例如对于 numPartitions = 1,这可能会导致您在比您喜欢的更少的节点上进行计算(例如,在 numPartitions = 1 的情况下为一个节点)。为避免这种情况,您可以调用 repartition。这将添加一个 shuffle 步骤,但意味着当前的上游分区将并行执行(无论当前分区是什么)。

    结论是您应该根据预期的数据量和所需的并行度相应地调整参数。 coalesce(1) 这样的做法在实践中很少有用,尤其是在流式传输这样的环境中,其中数据属性会随时间而变化。

    【讨论】:

    • 我有 2 个数据节点,我的数据只有 1mb 文件 @user9985951
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-11
    • 1970-01-01
    • 1970-01-01
    • 2018-10-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多