【问题标题】:How to avoid duplicate columns after join?加入后如何避免重复列?
【发布时间】:2016-05-17 10:59:48
【问题描述】:

我有两个包含以下列的数据框:

df1.columns
//  Array(ts, id, X1, X2)

df2.columns
//  Array(ts, id, Y1, Y2)

做完之后

val df_combined = df1.join(df2, Seq(ts,id))

我最终得到以下列:Array(ts, id, X1, X2, ts, id, Y1, Y2)。我可以预期公共列会被删除。有什么额外的事情需要做吗?

【问题讨论】:

  • 如果您将连接列定义为字符串的Seq(用于列名),则不应重复列。请参阅下面的答案。

标签: scala apache-spark apache-spark-sql


【解决方案1】:

简单的答案(来自Databricks FAQ on this matter)是执行连接,其中连接的列表示为字符串数组(或一个字符串)而不是谓词。

以下是改编自 Databricks 常见问题解答的示例,但有两个连接列以回答原始发布者的问题。

这是left数据框:

val llist = Seq(("bob", "b", "2015-01-13", 4), ("alice", "a", "2015-04-23",10))

val left = llist.toDF("firstname","lastname","date","duration")

left.show()

/*
+---------+--------+----------+--------+
|firstname|lastname|      date|duration|
+---------+--------+----------+--------+
|      bob|       b|2015-01-13|       4|
|    alice|       a|2015-04-23|      10|
+---------+--------+----------+--------+
*/

这是正确的数据框:

val right = Seq(("alice", "a", 100),("bob", "b", 23)).toDF("firstname","lastname","upload")

right.show()

/*
+---------+--------+------+
|firstname|lastname|upload|
+---------+--------+------+
|    alice|       a|   100|
|      bob|       b|    23|
+---------+--------+------+
*/

这是一个不正确的解决方案,其中连接列被定义为谓词left("firstname")===right("firstname") && left("lastname")===right("lastname")

不正确的结果是firstnamelastname 列在连接的数据框中重复:

left.join(right, left("firstname")===right("firstname") &&
                 left("lastname")===right("lastname")).show

/*
+---------+--------+----------+--------+---------+--------+------+
|firstname|lastname|      date|duration|firstname|lastname|upload|
+---------+--------+----------+--------+---------+--------+------+
|      bob|       b|2015-01-13|       4|      bob|       b|    23|
|    alice|       a|2015-04-23|      10|    alice|       a|   100|
+---------+--------+----------+--------+---------+--------+------+
*/

正确的解决方案是将连接列定义为字符串数组Seq("firstname", "lastname")。输出数据框没有重复列:

left.join(right, Seq("firstname", "lastname")).show

/*
+---------+--------+----------+--------+------+
|firstname|lastname|      date|duration|upload|
+---------+--------+----------+--------+------+
|      bob|       b|2015-01-13|       4|    23|
|    alice|       a|2015-04-23|      10|   100|
+---------+--------+----------+--------+------+
*/

【讨论】:

  • 实际上输出 DF does 使用以下内容有重复项; val joined = sampledDF.join(idsDF, idColumns, "inner") 。其中idColumns 是一个包含连接列的 Seq[String]
  • 如果两个数据集中的列名不同,我认为这不起作用。
  • 当 4 个连接表达式中有 2 个在两个表中有不同的列但 2 个在两个表上引用相同的列时该怎么办。重命名?
  • 如果列有空值并且它是空比较,这将不起作用
  • 如果我们加入的公共列在不同的数据框中有不同的名称怎么办?
【解决方案2】:

这是预期的行为。 DataFrame.join 方法相当于 SQL join 这样

SELECT * FROM a JOIN b ON joinExprs

如果您想忽略重复的列,只需删除它们或在之后选择感兴趣的列。如果您想消除歧义,可以使用父 DataFrames 访问这些:

val a: DataFrame = ???
val b: DataFrame = ???
val joinExprs: Column = ???

a.join(b, joinExprs).select(a("id"), b("foo"))
// drop equivalent 
a.alias("a").join(b.alias("b"), joinExprs).drop(b("id")).drop(a("foo"))

或使用别名:

// As for now aliases don't work with drop
a.alias("a").join(b.alias("b"), joinExprs).select($"a.id", $"b.foo")

对于 equi-join,存在一个特殊的快捷语法,它采用 a sequence of strings:

val usingColumns: Seq[String] = ???

a.join(b, usingColumns)

single string

val usingColumn: String = ???

a.join(b, usingColumn)

只保留一份连接条件中使用的列的副本。

【讨论】:

  • 我可以删除重复的列而不是选择吗?
  • 是的,但只能通过父母而不是别名。
  • 外连接怎么样?任何没有匹配的行都将在表的一个键列中具有空值,但您不知道提前删除哪一个。有没有办法优雅地处理这种情况?
  • @Darryl coalesce 并删除两者。
  • 在连接的数据框中,我希望列名不是输入表的列名。有没有办法做到这一点?例如:我不想将列名作为“foo”,它取自“b”数据框,而是将列名作为“column_new”。类似这样的 sql 查询:“select b.foo as column_new”
【解决方案3】:

我已经被这个问题困扰了一段时间,直到最近我才想出了一个非常简单的解决方案。

说a是

scala> val a  = Seq(("a", 1), ("b", 2)).toDF("key", "vala")
a: org.apache.spark.sql.DataFrame = [key: string, vala: int]

scala> a.show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+
and 
scala> val b  = Seq(("a", 1)).toDF("key", "valb")
b: org.apache.spark.sql.DataFrame = [key: string, valb: int]

scala> b.show
+---+----+
|key|valb|
+---+----+
|  a|   1|
+---+----+

我可以这样做以仅选择数据框 a 中的值:

scala> a.join(b, a("key") === b("key"), "left").select(a.columns.map(a(_)) : _*).show
+---+----+
|key|vala|
+---+----+
|  a|   1|
|  b|   2|
+---+----+

【讨论】:

【解决方案4】:

你可以简单地使用它

df1.join(df2, Seq("ts","id"),"TYPE-OF-JOIN")

这里可以是TYPE-OF-JOIN

  • 内部
  • 全外

例如,我有两个这样的数据框:

// df1
word   count1
w1     10   
w2     15  
w3     20

// df2
word   count2
w1     100   
w2     150  
w5     200

如果你做fullouter join 那么结果是这样的

df1.join(df2, Seq("word"),"fullouter").show()

word   count1  count2
w1     10      100
w2     15      150
w3     20      null
w5     null    200

【讨论】:

  • 这里怎么添加条件,比如col("count1") > 10say
  • 我认为你可以这样做df1.join(df2, Seq("word"),"fullouter").filter($"count1">10).show()这个。如果它不起作用,请告诉我。
  • 如何导入Seq
  • 我认为你不需要导入任何东西。如果它不适合你,那么试试这个import spark.implicits._
【解决方案5】:

试试这个,

val df_combined = df1.join(df2, df1("ts") === df2("ts") && df1("id") === df2("id")).drop(df2("ts")).drop(df2("id"))

【讨论】:

    【解决方案6】:

    这是 SQL 的正常行为,我正在为此做些什么:

    • 删除或重命名源列
    • 加入
    • 删除重命名的列(如果有)

    我在这里替换“全名”列:

    Java 中的一些代码:

    this
        .sqlContext
        .read()
        .parquet(String.format("hdfs:///user/blablacar/data/year=%d/month=%d/day=%d", year, month, day))
        .drop("fullname")
        .registerTempTable("data_original");
    
    this
        .sqlContext
        .read()
        .parquet(String.format("hdfs:///user/blablacar/data_v2/year=%d/month=%d/day=%d", year, month, day))
        .registerTempTable("data_v2");
    
     this
        .sqlContext
        .sql(etlQuery)
        .repartition(1)
        .write()
        .mode(SaveMode.Overwrite)
        .parquet(outputPath);
    

    查询在哪里:

    SELECT
        d.*,
       concat_ws('_', product_name, product_module, name) AS fullname
    FROM
        {table_source} d
    LEFT OUTER JOIN
        {table_updates} u ON u.id = d.id
    

    我相信这是您只能使用 Spark 才能做到的事情(从列表中删除列),非常非常有帮助!

    【讨论】:

      【解决方案7】:

      最佳做法是在加入它们之前使两个 DF 中的列名不同,然后相应地删除。

      df1.columns =[id, age, income]
      df2.column=[id, age_group]
      
      df1.join(df2, on=df1.id== df2.id,how='inner').write.saveAsTable('table_name')
      

      重复列出错时会返回错误

      试试这个,试试这个:

      df2_id_renamed = df2.withColumnRenamed('id','id_2')
      df1.join(df2_id_renamed, on=df1.id== df2_id_renamed.id_2,how='inner').drop('id_2')
      

      【讨论】:

        【解决方案8】:

        如果有人在使用 spark-SQL 并且想要实现相同的目标,那么您可以在连接查询中使用 USING 子句。

        val spark = SparkSession.builder().master("local[*]").getOrCreate()
        spark.sparkContext.setLogLevel("ERROR")
        import spark.implicits._
        
        val df1 = List((1, 4, 3), (5, 2, 4), (7, 4, 5)).toDF("c1", "c2", "C3")
        val df2 = List((1, 4, 3), (5, 2, 4), (7, 4, 10)).toDF("c1", "c2", "C4")
        
        df1.createOrReplaceTempView("table1")
        df2.createOrReplaceTempView("table2")
        
        spark.sql("select * from table1  inner join  table2  using (c1, c2)").show(false)
        
        /*
        +---+---+---+---+
        |c1 |c2 |C3 |C4 |
        +---+---+---+---+
        |1  |4  |3  |3  |
        |5  |2  |4  |4  |
        |7  |4  |5  |10 |
        +---+---+---+---+
        */
        

        【讨论】:

          【解决方案9】:

          将多个表连接在一起后,如果遇到重复项,我会通过一个简单的函数运行它们以重命名 DF 中的列。或者,you could drop these duplicate columns too

          其中Names 是具有列['Id', 'Name', 'DateId', 'Description']Dates 是具有列['Id', 'Date', 'Description'] 的表,列IdDescription 将在连接后重复。

          Names = sparkSession.sql("SELECT * FROM Names")
          Dates = sparkSession.sql("SELECT * FROM Dates")
          NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
          NamesAndDates = deDupeDfCols(NamesAndDates, '_')
          NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")
          

          其中deDupeDfCols定义为:

          def deDupeDfCols(df, separator=''):
              newcols = []
          
              for col in df.columns:
                  if col not in newcols:
                      newcols.append(col)
                  else:
                      for i in range(2, 1000):
                          if (col + separator + str(i)) not in newcols:
                              newcols.append(col + separator + str(i))
                              break
          
              return df.toDF(*newcols)
          

          生成的数据框将包含列['Id', 'Name', 'DateId', 'Description', 'Id2', 'Date', 'Description2']

          抱歉,这个答案是用 Python 编写的 - 我不熟悉 Scala,但这是我在谷歌上搜索这个问题时出现的问题,我确信 Scala 代码并没有不同.

          【讨论】:

            【解决方案10】:

            Inner Join是spark默认的join,下面是简单的语法。

            leftDF.join(rightDF,"Common Col Nam")
            

            对于其他连接,您可以遵循以下语法

            leftDF.join(rightDF,Seq("Common Columns comma seperated","join type")
            

            如果列名称不常见则

            leftDF.join(rightDF,leftDF.col("x")===rightDF.col("y),"join type")
            

            【讨论】:

            • OP 询问加入后删除重复列我认为你错过了这一点。
            猜你喜欢
            • 2018-12-11
            • 1970-01-01
            • 2013-10-08
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多