【发布时间】:2016-06-29 21:11:26
【问题描述】:
我正在通过另一个进程生成配对的 rdd/df,但这里是生成数据集以帮助调试过程的代码。
这是示例 i/p 文件 (/scratch/test2.txt): 1 本书1 作者1 1.10 2 本书2 作者2 2.20 1 本书3 作者2 3.30
这是生成数据帧的代码
case class RefText (index: Int, description: String, fName: String, weight: Double)
val annotation_split = sc.textFile("/scratch/test2.txt").map(_.split("\t"))
val annotation = annotation_split.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated = annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
annotate_concated.show()
+-----+-----------------+
|index| annotation|
+-----+-----------------+
| 1|book1#author1#1.1|
| 2|book2#author2#2.2|
| 1|book3#author2#3.3|
+-----+-----------------+
//Here is how I generate pairedrdd.
val paired_rdd : PairRDDFunctions[String, String] = annotate_concated.rdd.map(row => (row.getString(0), row.getString(1)))
val df = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")
这是我的数据框的示例数据,列描述的格式如下(text1#text2#weight | text1#text2#weight|....)
用户1 book1#author1#0.07841217886795074|tool1#desc1#1.27044260397331488|song1#album1#-2.052661673730870676|item1#category1#-0.005683148395350108
用户2 book2#author1#4.07841217886795074|tool2#desc1#-1.27044260397331488|song2#album1#2.052661673730870676|item2#category1#-0.005683148395350108
我想根据权重按降序对描述列进行排序。
所需的o/p是:
用户1 tool1#desc1#1.27044260397331488|book1#author1#0.07841217886795074|item1#category1#-0.005683148395350108|song1#album1#-2.052661673730870676
用户2 book2#author1#4.07841217886795074|song2#album1#2.052661673730870676|tool2#desc1#-1.27044260397331488|item2#category1#-0.005683148395350108
对此的任何帮助将不胜感激。
【问题讨论】:
-
您能给我们一个示例代码,说明如何根据示例数据创建 df 以及您期望的确切输出吗?
-
我已更新代码以添加所需的 o/p。
-
很酷,但我仍然很难找出paired_rdd。如何创建它?
-
也添加了该代码。
标签: apache-spark apache-spark-sql spark-dataframe