【问题标题】:Removing duplicate columns after a DF join in Spark在 Spark 中的 DF 连接后删除重复的列
【发布时间】:2018-04-07 06:08:42
【问题描述】:

当您加入两个列名相似的 DF 时:

df = df1.join(df2, df1['id'] == df2['id'])

加入工作正常,但您不能调用 id 列,因为它不明确,您会收到以下异常:

pyspark.sql.utils.AnalysisException:“参考‘id’不明确, 可能是:id#5691, id#5918.;"

这使得id 不再可用...

下面的函数解决了这个问题:

def join(df1, df2, cond, how='left'):
    df = df1.join(df2, cond, how=how)
    repeated_columns = [c for c in df1.columns if c in df2.columns]
    for col in repeated_columns:
        df = df.drop(df2[col])
    return df

我不喜欢的是我必须遍历列名并将它们删除为什么。这看起来真的很笨重...

您是否知道任何其他解决方案可以更优雅地加入和删除重复项或删除多个列而不迭代每个列?

【问题讨论】:

  • 标记答案会帮助别人。

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

如果两个数据框的join列同名,只需要equi join,可以将join列指定为列表,此时结果只保留其中一个join列:

df1.show()
+---+----+
| id|val1|
+---+----+
|  1|   2|
|  2|   3|
|  4|   4|
|  5|   5|
+---+----+

df2.show()
+---+----+
| id|val2|
+---+----+
|  1|   2|
|  1|   3|
|  2|   4|
|  3|   5|
+---+----+

df1.join(df2, ['id']).show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

否则你需要给连接数据框alias,并在后面通过alias引用重复的列:

df1.alias("a").join(
    df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+
| id|val1|val2|
+---+----+----+
|  1|   2|   2|
|  1|   2|   3|
|  2|   3|   4|
+---+----+----+

【讨论】:

  • 一个简单而优雅的解决方案 :) 现在,如果您想从 alias = a 中选择所有列并从 alias = b 中选择单个列,您还可以使用 SQL 语法,如 .select("a.*", "b.val2")
【解决方案2】:

df.join(other, on, how)on 是列名字符串或列名字符串列表时,返回的数据框将防止重复列。 当on 是一个连接表达式时,它会导致重复的列。我们可以使用.drop(df.a) 删除重复的列。示例:

cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column a
result = df.join(other, cond, 'inner').drop(df.a)

【讨论】:

  • 那是……不直观(不同的行为取决于on 的形式)。但很高兴知道 - 谢谢。
  • 这个解决方案对我不起作用(在 Spark 3 中)。尝试使用这样的引用删除列时,出现错误:each col in the param list should be a string
【解决方案3】:

假设“a”是一个具有“id”列的数据框,“b”是另一个具有“id”列的数据框

我使用以下两种方法去除重复:

方法 1:使用字符串连接表达式而不是布尔表达式。这会自动为您删除重复的列

a.join(b, 'id')

方法2:在加入前重命名列,在加入后删除

b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)

【讨论】:

    【解决方案4】:

    以下代码适用于 Spark 1.6.0 及更高版本。

    salespeople_df.show()
    +---+------+-----+
    |Num|  Name|Store|
    +---+------+-----+
    |  1| Henry|  100|
    |  2| Karen|  100|
    |  3|  Paul|  101|
    |  4| Jimmy|  102|
    |  5|Janice|  103|
    +---+------+-----+
    
    storeaddress_df.show()
    +-----+--------------------+
    |Store|             Address|
    +-----+--------------------+
    |  100|    64 E Illinos Ave|
    |  101|         74 Grand Pl|
    |  102|          2298 Hwy 7|
    |  103|No address available|
    +-----+--------------------+
    

    假设 - 在这个例子中 - 共享列的名称是相同的:

    joined=salespeople_df.join(storeaddress_df, ['Store'])
    joined.orderBy('Num', ascending=True).show()
    
    +-----+---+------+--------------------+
    |Store|Num|  Name|             Address|
    +-----+---+------+--------------------+
    |  100|  1| Henry|    64 E Illinos Ave|
    |  100|  2| Karen|    64 E Illinos Ave|
    |  101|  3|  Paul|         74 Grand Pl|
    |  102|  4| Jimmy|          2298 Hwy 7|
    |  103|  5|Janice|No address available|
    +-----+---+------+--------------------+
    

    .join 将防止共享列的重复。

    假设您要在此示例中删除列Num,您可以只使用.drop('colname')

    joined=joined.drop('Num')
    joined.show()
    
    +-----+------+--------------------+
    |Store|  Name|             Address|
    +-----+------+--------------------+
    |  103|Janice|No address available|
    |  100| Henry|    64 E Illinos Ave|
    |  100| Karen|    64 E Illinos Ave|
    |  101|  Paul|         74 Grand Pl|
    |  102| Jimmy|          2298 Hwy 7|
    +-----+------+--------------------+
    

    【讨论】:

      【解决方案5】:

      将多个表连接在一起后,我通过一个简单的函数运行它们,以在从左到右移动时遇到重复的 DF 中删除列。或者,you could rename these columns too

      其中Names 是具有列['Id', 'Name', 'DateId', 'Description']Dates 是具有列['Id', 'Date', 'Description'] 的表,列IdDescription 将在连接后重复。

      Names = sparkSession.sql("SELECT * FROM Names")
      Dates = sparkSession.sql("SELECT * FROM Dates")
      NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
      NamesAndDates = dropDupeDfCols(NamesAndDates)
      NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")
      

      其中dropDupeDfCols定义为:

      def dropDupeDfCols(df):
          newcols = []
          dupcols = []
      
          for i in range(len(df.columns)):
              if df.columns[i] not in newcols:
                  newcols.append(df.columns[i])
              else:
                  dupcols.append(i)
      
          df = df.toDF(*[str(i) for i in range(len(df.columns))])
          for dupcol in dupcols:
              df = df.drop(str(dupcol))
      
          return df.toDF(*newcols)
      

      生成的数据框将包含列['Id', 'Name', 'DateId', 'Description', 'Date']

      【讨论】:

        【解决方案6】:

        在我的情况下,我有一个数据框,在连接后有多个重复的列,我试图以 csv 格式与该数据框相同,但由于重复的列,我遇到了错误。我按照以下步骤删除重复的列。代码在scala中

        1) Rename all the duplicate columns and make new dataframe 2) make separate list for all the renamed columns 3) Make new dataframe with all columns (including renamed - step 1) 4) drop all the renamed column

        private def removeDuplicateColumns(dataFrame:DataFrame): DataFrame = {
        var allColumns:  mutable.MutableList[String] = mutable.MutableList()
        val dup_Columns: mutable.MutableList[String] = mutable.MutableList()
        dataFrame.columns.foreach((i: String) =>{
        if(allColumns.contains(i))
        
        if(allColumns.contains(i))
        {allColumns += "dup_" + i
        dup_Columns += "dup_" +i
        }else{
        allColumns += i
        }println(i)
        })
        val columnSeq = allColumns.toSeq
        val df = dataFrame.toDF(columnSeq:_*)
        val unDF = df.drop(dup_Columns:_*)
        unDF
        }
        

        to call the above function use below code and pass your dataframe which contains duplicate columns

        val uniColDF = removeDuplicateColumns(df)
        

        【讨论】:

        • 感谢此解决方案有效!。虽然是一些小的语法错误。也不要忘记导入: import org.apache.spark.sql.DataFrame import scala.collection.mutable
        【解决方案7】:

        如果您加入列表或字符串,则会自动删除 dup cols]1 这是一个 scala 解决方案,您可以将相同的想法翻译成任何语言

        // get a list of duplicate columns or use a list/seq 
        // of columns you would like to join on (note that this list
        // should include columns for which you do not want duplicates)
        val duplicateCols = df1.columns.intersect(df2.columns) 
        
        // no duplicate columns in resulting DF
        df1.join(df2, duplicateCols.distinct.toSet)
        

        【讨论】:

          猜你喜欢
          • 2017-09-17
          • 2021-09-05
          • 1970-01-01
          • 2023-02-04
          • 1970-01-01
          • 2018-04-25
          • 2020-08-11
          • 1970-01-01
          • 2020-12-11
          相关资源
          最近更新 更多