【问题标题】:Generic coalesce of multiple columns in join pyspark连接pyspark中多列的通用合并
【发布时间】:2021-07-26 15:58:53
【问题描述】:

我必须合并许多 spark DataFrame。合并后,我想在具有相同名称的多个列之间执行合并。

我能够在 question 之后创建一个最小示例。

但是,我需要一段更通用的代码来支持:一组要合并的变量(在示例中为 set_vars = set(('var1','var2')))和多个连接键(在示例中为 join_keys = set(('id')))。

pyspark 中是否有更简洁(更通用)的方法来获得此结果?

df1 =  spark.createDataFrame([ 
        ( 1, None ,  "aa"),
        ( 2 , "a", None ),
        ( 3 , "b",  None),
        ( 4 , "h",  None),],
        "id int, var1 string, var2 string",
       )

df2 =  spark.createDataFrame([ 
        ( 1, "f" ,  "Ba"),
        ( 2 , "a", "bb" ),
        ( 3 , "b",  None),],
        "id int, var1 string, var2 string",
       )

df1 = df1.alias("df1")
df2 = df2.alias("df2")

df3 = df1.join(df2, df1.id == df2.id, how='left').withColumn("var1_", coalesce("df1.var1", "df2.var1")).drop("var1").withColumnRenamed("var1_", "var1").withColumn("var2_", coalesce("df1.var2", "df2.var2")).drop("var2").withColumnRenamed("var2_", "var2")

【问题讨论】:

    标签: python pyspark azure-databricks coalesce


    【解决方案1】:

    我们可以通过将列作为列表传递给连接方法来避免重复列,而不是编写连接条件,请参阅link。 但是这里有一些常见的列,它们不是连接条件所必需的。我们可以使用 for 循环来概括您的代码。

    spark = SparkSession.builder.master("local[*]").getOrCreate()
    
    df1 =  spark.createDataFrame([
            ( 1, None ,  "aa"),
            ( 2 , "a", None ),
            ( 3 , "b",  None),
            ( 4 , "h",  None),],
            "id int, var1 string, var2 string",
           )
    
    df2 =  spark.createDataFrame([
            ( 1, "f" ,  "Ba"),
            ( 2 , "a", "bb" ),
            ( 3 , "b",  None),],
            "id int, var1 string, var2 string",
           )
    
    df1 = df1.alias("df1")
    df2 = df2.alias("df2")
    
    key_columns = ["id"]
    # Get common columns between 2 dataframes excluding columns-
    # -which are being used in joining conditions
    other_common_columns = set(df1.columns).intersection(set(df2.columns))\
    .difference(set(key_columns))
    
    outputDF = df1.join(df2, key_columns, how='left')
    
    for i in other_common_columns:
        outputDF = outputDF.withColumn(f"{i}_", coalesce(f"df1.{i}", f"df2.{i}"))\
    .drop(i).withColumnRenamed(f"{i}_", i)
    
    outputDF.show()
    
    +---+----+----+
    | id|var2|var1|
    +---+----+----+
    |  1|  aa|   f|
    |  3|null|   b|
    |  4|null|   h|
    |  2|  bb|   a|
    +---+----+----+
    

    【讨论】:

    • 非常感谢 Mohana,我想在同一步骤中包含所有内容,但 for 解决方案也非常优雅。
    猜你喜欢
    • 2021-05-23
    • 1970-01-01
    • 2017-09-24
    • 2018-06-26
    • 2020-09-28
    • 2019-07-03
    • 1970-01-01
    • 2023-02-14
    • 2018-03-16
    相关资源
    最近更新 更多