【问题标题】:Using scala to dump result processed by Spark to HDFS使用 scala 将 Spark 处理的结果转储到 HDFS
【发布时间】:2014-08-21 06:04:52
【问题描述】:

在使用 spark 处理数据后,我很难找到将数据保存到 HDFS 中的正确方法。

这就是我想要做的。我正在计算数值字段的最小值、最大值和 SD。我的输入文件有数百万行,但输出只有大约 15-20 个字段。因此,输出是每个字段的单个值(标量)。

例如:我将FIELD1的所有行加载到RDD中,最后,我将获得FIELD 1的3个单个值(MIN,MAX,SD)。我将这三个值连接成临时字符串。最后,我将有 15 到 20 行,包含以下格式的 4 列

FIELD_NAME_1  MIN  MAX  SD
FIELD_NAME_2  MIN  MAX  SD

这是一个sn-p的代码:

//create rdd
val data = sc.textFile("hdfs://x.x.x.x/"+args(1)).cache()
//just get the first column
val values = data.map(_.split(",",-1)(1))

val data_double= values.map(x=>if(x==""){0}else{x}.toDouble)
val min_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(true).take(1)(0)._1
val max_value= data_double.map((_,1)).reduceByKey((_+_)).sortByKey(false).take(1)(0)._1
val SD = data_double.stdev

所以,我有 3 个变量,min_value、max_value 和 SD,我想将它们存储回 hdfs。

问题 1: 由于输出会相当小,我是否只是将其保存在服务器本地?或者我应该将它转储到 HDFS。在我看来,在本地转储文件更有意义。

问题 2: 在 Spark 中,我可以调用以下命令将 RDD 保存到文本文件中

some_RDD.saveAsTextFile("hdfs://namenode/path")

对于不是 scala 中的 RDD 的字符串变量,我如何完成同样的事情?我应该先将我的结果并行化为 RDD,然后调用 saveAsTextFile?

【问题讨论】:

    标签: scala hadoop hdfs apache-spark


    【解决方案1】:

    要在本地保存就可以了

    some_RDD.collect()

    然后使用类似question 的内容保存结果数组。是的,如果数据集很小,并且可以轻松放入内存中,您应该收集并将其带到程序的驱动程序中。如果要存储在内存中的数据有点大,另一种选择是 some_RDD.coalesce(numParitionsToStoreOn)。请记住coalesce 也需要一个布尔值shuffle,如果您在合并之前对数据进行计算,您应该将其设置为 true 以获得更多的计算并行性。当您调用some_RDD.saveAsTextFile("hdfs://namenode/path") 时,Coalesce 将减少存储数据的节点数量。如果文件很小但是你需要在hdfs上,调用repartition(1),和coalesce(1,true)一样,这样可以保证你的数据只保存在一个节点上。

    更新: 因此,如果您只想在 HDFS 中保存三个值,您可以这样做。 sc.parallelize(List((min_value,max_value,SD)),1).saveAsTextFile("pathTofile")

    基本上你只是将 3 个变量放在一个元组中,将其包装在一个列表中并将并行度设置为一个,因为数据非常小

    【讨论】:

    • 对不起,我不够清楚。我的最终结果不是 RDD 格式。我的最终结果是单个单位值(标量)。因此,我遍历 RDD,计算每个字段的四分位数,并将每个标量值存储在一个临时字符串中。所以,我真的不能使用 saveAsTextFile
    • @user2773013 你确定你的意思不是只有一个值吗,scala中的单位类型意味着无效,即使你只有一个值你仍然可以使用collect
    • @user2773013 如果您的最终结果不是 RDD,您是如何做到这一点的,您的问题非常不详细。我更新以展示如何保存到 hdfs 但只能在一个节点上
    • 感谢亚伦曼。抱歉不清楚。我已经更新了这个问题。希望它更清楚一点。
    • @user2773013 查看更新部分我认为它完全符合您的要求
    【解决方案2】:

    答案 1:由于您只需要几个标量,我想说将它们存储在本地文件系统中。你可以先做val localValue = rdd.collect(),它将从worker收集所有数据到master。然后调用 java.io 将内容写入磁盘。

    答案 2:您可以执行 sc.parallelize(yourString).saveAsTextFile("hdfs://host/yourFile")。会将内容写入part-000*。如果您想将所有内容放在一个文件中,hdfs dfs -getmerge 可以为您提供帮助。

    【讨论】:

    • sc.parallelize(yourString) 不能作为并行化方法,除了列表不是字符串,如果你知道任何传递字符串的方法,请回复
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-12-25
    • 2015-10-18
    • 1970-01-01
    • 2014-05-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多