【问题标题】:Joining two dataframes without a common column在没有公共列的情况下连接两个数据框
【发布时间】:2018-09-19 04:35:47
【问题描述】:

我有两个具有不同类型列的数据框。我需要加入这两个不同的数据框。请参考以下示例

val df1 has
Customer_name 
Customer_phone
Customer_age

val df2 has
Order_name
Order_ID

这两个数据框没有任何共同的列。两个数据框中的行数和列数也不同。我尝试插入一个新的虚拟列来增加 row_index 值,如下所示 val dfr=df1.withColumn("row_index",monotonically_increasing_id()).

但由于我使用的是 Spark 2,因此不支持 monotonically_increasing_id 方法。有什么方法可以连接两个数据框,这样我就可以在一张 excel 文件中创建两个数据框的值。

例如

val df1:
Customer_name  Customer_phone  Customer_age
karti           9685684551     24      
raja            8595456552     22

val df2:
Order_name Order_ID
watch       1
cattoy     2

我的最终excel表格应该是这样的:

Customer_name  Customer_phone  Customer_age   Order_name  Order_ID

karti          9685684551      24             watch        1
   
raja           8595456552      22             cattoy      2

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    使用以下代码向两个数据框添加索引列

    df1.withColumn("id1",monotonicallyIncreasingId)
    df2.withColumn("id2",monotonicallyIncreasingId)
    

    然后使用以下代码连接两个数据框并删除索引列

    df1.join(df2,col("id1")===col("id2"),"inner")
       .drop("id1","id2")
    

    【讨论】:

    • 我建议使用 monotonically_increasing_id 因为 monotonicallyIncreasingId 已被弃用
    【解决方案2】:

    monotonically_increasing_id()增加唯一,但不是连续

    您可以通过转换为rdd 并为dataframe 重构具有相同架构的Dataframe 来使用zipWithIndex

    import spark.implicits._
    
    
    val df1 = Seq(
      ("karti", "9685684551", 24),
      ("raja", "8595456552", 22)
    ).toDF("Customer_name", "Customer_phone", "Customer_age")
    
    
    val df2 = Seq(
      ("watch", 1),
      ("cattoy", 2)
    ).toDF("Order_name", "Order_ID")
    
    val df11 = spark.sqlContext.createDataFrame(
      df1.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df1.schema.fields :+ StructField("index", LongType, false))
    )
    
    
    val df22 = spark.sqlContext.createDataFrame(
      df2.rdd.zipWithIndex.map {
        case (row, index) => Row.fromSeq(row.toSeq :+ index)
      },
      // Create schema for index column
      StructType(df2.schema.fields :+ StructField("index", LongType, false))
    )
    

    现在加入最终的数据帧

    df11.join(df22, Seq("index")).drop("index")
    

    输出:

    +-------------+--------------+------------+----------+--------+
    |Customer_name|Customer_phone|Customer_age|Order_name|Order_ID|
    +-------------+--------------+------------+----------+--------+
    |karti        |9685684551    |24          |watch     |1       |
    |raja         |8595456552    |22          |cattoy    |2       |
    +-------------+--------------+------------+----------+--------+
    

    【讨论】:

    • 这有点复杂和冗长的 TBH。 @ss301 的回答比较简单。
    猜你喜欢
    • 2019-05-19
    • 2020-10-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多