【问题标题】:Spark Dataframe Join - Duplicate column (non-joined column)Spark Dataframe Join - 重复列(非连接列)
【发布时间】:2018-06-26 01:44:21
【问题描述】:

我有两个 Dataframes df1(员工表)和 df2(部门表),具有以下架构:

df1.columns
// Arrays(id,name,dept_id)

df2.columns
// Array(id,name)

在我在 df1.dept_id 和 df2.id 上加入这两个表之后:

val joinedData = df1.join(df2,df1("dept_id")===df2("id"))
joinedData.columns
// Array(id,name,dept_id,id,name)

将其保存在文件中时,

joined.write.csv("<path>")

它给出了错误:

 org.apache.spark.sql.AnalysisException: Duplicate column(s) : "name", "id" found, cannot save to file.;

我阅读了有关使用字符串序列来避免列重复的信息,但这是针对要执行连接的列。我需要为未连接的列提供类似的功能。

有没有直接的方法来嵌入重复列的表名以便保存?

我想出了一个匹配 dfs 的列并重命名重复列以将表名附加到列名的解决方案。但是有直接的方法吗?

注意:这将是一个通用代码,仅包含执行连接的列详细信息。仅在运行时才知道剩余列。所以我们不能通过硬编码来重命名列。

【问题讨论】:

  • @Vijay 请检查所有答案。
  • 已检查。仍然没有可用的答案。

标签: scala apache-spark dataframe join apache-spark-sql


【解决方案1】:

我会通过确保它们具有不同的名称来保留所有列,例如通过在列名前添加标识符:

val df1Cols = df1.columns
val df2Cols = df2.columns

// prefixes to column names
val df1pf = df1.select(df1Cols.map(n => col(n).as("df1_"+n)):_*)
val df2pf = df2.select(df2Cols.map(n => col(n).as("df2_"+n)):_*)

df1pf.join(df2pf,
    $"df1_dept_id"===$"df2_id",
 )

【讨论】:

    【解决方案2】:

    经过进一步研究并征求其他开发者的意见,可以肯定没有直接的方法。一种方法是更改​​@Raphael 指定的所有列的名称。但我通过仅更改重复列解决了我的问题:

    val commonCols = df1.columns.intersect(df2.columns)
    val newDf2 = changeColumnsName(df2,commonCols,"df1")
    

    changeColumnsName 定义在哪里:

    @tailrec
    def changeColumnsName(dataFrame: DataFrame, columns: Array[String], tableName: String): DataFrame = {
    if (columns.size == 0)
      dataFrame
    else
      changeColumnsName(dataFrame.withColumnRenamed(columns.head, tableName + "_" + columns.head), columns.tail, tableName)
    

    }

    现在,执行连接:

    val joinedData = df1.join(newDf2,df1("dept_id")===newDf2("df2_id"))
    joinedData.columns
    // Array(id,name,dept_id,df2_id,df2_name)
    

    【讨论】:

      【解决方案3】:

      您可以尝试为数据框使用别名,

      import spark.implicits._
      
      df1.as("df1")
        .join(df2.alias("df2"),df1("dept_id") === df2("id"))
        .select($"df1.*",$"df2.*").show()
      

      【讨论】:

        【解决方案4】:
        val llist = Seq(("bob", "2015-01-13", 4), ("alice", "2015-04-23",10)) 
        val left = llist.toDF("name","date","duration")
        val right = Seq(("alice", 100),("bob", 23)).toDF("name","upload")
        
        val df = left.join(right, left.col("name") === right.col("name")) 
        
        display(df)
        
        
        head(drop(join(left, right, left$name == right$name), left$name))
        

        https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

        【讨论】:

          猜你喜欢
          • 2021-09-05
          • 2015-10-05
          • 2016-02-22
          • 2018-10-24
          • 2021-12-18
          • 1970-01-01
          相关资源
          最近更新 更多