【问题标题】:How to write Spark Streaming output to HDFS without overwriting如何在不覆盖的情况下将 Spark Streaming 输出写入 HDFS
【发布时间】:2017-11-13 12:34:14
【问题描述】:

经过一些处理我有一个 DStream[String , ArrayList[String]] ,所以当我使用 saveAsTextFile 将它写入 hdfs 并且在每批之后它会覆盖数据,所以如何通过附加到以前的结果来写入新结果

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})

编辑 :: 如果有人可以帮助我将输出转换为 avro 格式,然后通过附加写入 HDFS

【问题讨论】:

标签: apache-kafka spark-streaming


【解决方案1】:

saveAsTextFile 不支持追加。如果使用固定的文件名调用,它每次都会覆盖它。 我们可以使用saveAsTextFile(path+timestamp) 每次都保存到一个新文件中。这就是DStream.saveAsTextFiles(path)的基本功能

支持append 的易于访问的格式是 Parquet。我们首先将数据 RDD 转换为 DataFrameDataset,然后我们可以从该抽象之上提供的写入支持中受益。

case class DataStructure(field1,..., fieldn)

... streaming setup, dstream declaration, ...

val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd => 
  import sparkSession.implicits._
  val df = rdd.toDF()
  df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")

})

请注意,随着时间的推移,附加到 Parquet 文件的成本会越来越高,因此不时轮换目标文件仍然是一项要求。

【讨论】:

  • 任何用于将 RDD[String , ArrayList] 字符串转换为 avro 格式的代码,我都有一个架构。
  • 上面的追加代码只是为每个批次创建新文件,并没有追加到同一个文件。
  • 它附加到同一个逻辑文件,但确实在文件系统中创建了小的 part-xxxx 文件。您可以使用sparkSession.read.parquet(path) 一次加载所有数据。从 Spark Streaming,我建议写入分布式数据库,例如 Cassandra,而不是写入 fs。
  • 请帮我解决这个错误stackoverflow.com/questions/44524828/…
  • 有没有一种方法可以将每个 Spark Streaming 批处理的数据保存到单独的文件中。我想要“/rootpath/batch_id/filename”行?
【解决方案2】:

如果您想附加相同的文件并存储在文件系统中,请将其存储为 parquet 文件。你可以这样做

  kafkaData.foreachRDD( rdd => {
  if(rdd.count()>0)
  {
    val df=rdd.toDF()
    df.write(SaveMode.Append).save("/path")
   }

【讨论】:

  • 然后它还会创建多个文件。 @ishan Kumar
  • 你测试过这个吗?我认为它将新文件添加到 /path 目录中
【解决方案3】:

将流输出存储到 HDFS 将始终创建一个新文件,即使您使用 append 和 parquet 会导致 Namenode 上的小文件问题。我可能会建议将您的输出写入序列文件,您可以在其中继续附加到同一个文件。

【讨论】:

    【解决方案4】:

    这里我解决了没有数据框的问题

    import java.time.format.DateTimeFormatter
    import java.time.LocalDateTime
    
     messages.foreachRDD{ rdd =>
        rdd.repartition(1)
        val eachRdd = rdd.map(record => record.value)
        if(!eachRdd.isEmpty) {
          eachRdd.saveAsTextFile(hdfs_storage + DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now) + "/")
        }
      }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-09-19
      • 2015-07-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多