【问题标题】:Write and append Spark streaming data to a text file in HDFS将 Spark 流数据写入并附加到 HDFS 中的文本文件
【发布时间】:2017-06-26 02:28:35
【问题描述】:

我正在创建一个 spark scala 代码,其中我正在从 MQTT 服务器读取连续流。 我在纱线集群模式下运行我的工作。我想将此流保存并附加到 HDFS 中的单个文本文件中。

我将在每 1 秒后接收一次数据流。所以我需要将这些数据附加到 HDFS 的单个文本文件中。

谁能帮忙。

【问题讨论】:

  • 您不能有多个任务同时写入同一个 HDFS 文件。那么为什么要使用 Spark?
  • 对我来说,数据已写入最新流。我正在保存 dstream,例如:val lines = MQTTUtils.createStream(ssc,brokeraddress,topic)lines.foreachRDD{rdd => rdd.saveAsTextFile("rddoutput")}。我每 0.5 秒获取一次数据。所以我需要保存所有数据。但是“行”正在加载最新的 Dstream
  • @Arpit 嗨,已经完成了。我有类似的架构,你可以帮忙
  • @Arpit 你解决了吗?

标签: scala hadoop hdfs spark-streaming


【解决方案1】:

使用数据框和使用模式追加 这将在每次新记录出现时追加数据。

val sqlContext = new org.apache.spark.sql.SQLContext(context)
import sqlContext.implicits._

stream.map(_.value).foreachRDD(rdd => {
    rdd.foreach(println)
    if (!rdd.isEmpty()) {
        rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).save("C:/data/spark/")
        // rdd.saveAsTextFile("C:/data/spark/")
    }

})

【讨论】:

  • 你能解释一下这条线是如何工作的吗rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("C:/data/火花/")
【解决方案2】:

@Amrutha J Raj

rdd.toDF("value").coalesce(1).write.mode(SaveMode.Append).json("C:/data/spark/")

这意味着,RDD 将转换为 DF,我们使用了 coalesce(1),因此如果您不使用它,它将只有一个文件,然后 spark 可能会生成多个文件,因此它将限制为只有一个并且我们的写入模式是追加,所以它将追加到现有文件和客栈 json 格式。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多