【问题标题】:Merge two columns of different DataFrames in Spark using scala使用scala在Spark中合并两列不同的DataFrame
【发布时间】:2019-11-14 06:45:39
【问题描述】:

我想将来自不同数据帧的两列合并到一个数据帧中

我有两个这样的 DataFrame

val ds1 = sc.parallelize(Seq(1,0,1,0)).toDF("Col1")
val ds2 = sc.parallelize(Seq(234,43,341,42)).toDF("Col2")
ds1.show()

+-----+
| Col1|
+-----+
|    0|
|    1|
|    0|
|    1|
+-----+

ds2.show()
+-----+
| Col2|
+-----+
|  234|
|   43|
|  341|
|   42|
+-----+

我想要包含两列 Col1 和 Col2 的第三个数据框

+-----++-----+
| Col1|| Col2|
+-----++-----+
|    0||  234|
|    1||   43|
|    0||  341|
|    1||   42|
+-----++-----+

我试过联合

val ds3 = ds1.union(ds2)

但是,它将ds2 的所有行添加到ds1

【问题讨论】:

  • 是否要将另一列中每一行的值相加?

标签: scala apache-spark


【解决方案1】:

monotonically_increasing_id 确定性。

因此不能保证您会得到正确结果

使用 RDD创建密钥 使用 zipWithIndex

更容易
val ds1 = sc.parallelize(Seq(1,0,1,0)).toDF("Col1")
val ds2 = sc.parallelize(Seq(234,43,341,42)).toDF("Col2")

// Convert to RDD with ZIPINDEX < Which will be our key

val ds1Rdd = ds1.rdd.repartition(4).zipWithIndex().map{ case (v,k) => (k,v) }

val ds2Rdd = ds2.as[(Int)].rdd.repartition(4).zipWithIndex().map{ case (v,k) => (k,v) }

// Check How The KEY-VALUE Pair looks

ds1Rdd.collect()

res50: Array[(Long, Int)] = Array((0,0), (1,1), (2,1), (3,0))

res51: Array[(Long, Int)] = Array((0,341), (1,42), (2,43), (3,234))

所以元组的First元素是我们的Join

我们只需加入并重新排列结果数据框

val joinedRdd = ds1Rdd.join(ds2Rdd)

val resultrdd = joinedRdd.map(x => x._2).map(x => (x._1 ,x._2))

// resultrdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[204] at map at <console>

我们转换成DataFrame

 resultrdd.toDF("Col1","Col2").show()
+----+----+
|Col1|Col2|
+----+----+
|   0| 341|
|   1|  42|
|   1|  43|
|   0| 234|
+----+----+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-09-16
    • 2021-09-25
    • 2015-10-18
    • 1970-01-01
    • 2020-09-06
    • 1970-01-01
    • 1970-01-01
    • 2015-11-16
    相关资源
    最近更新 更多