【问题标题】:PySpark: Insert or update dataframe with another dataframePySpark:使用另一个数据框插入或更新数据框
【发布时间】:2019-01-31 02:37:20
【问题描述】:

我有两个数据框,DF1 和 DF2。 DF1 是主控,DF2 是增量。 DF2 中的数据应该插入到 DF1 中或用于更新 DF1 数据。

假设 DF1 的格式如下:

id_no start_date amount days
1 2016-01-01 4650 22
2 2016-01-02 3130 45
1 2016-01-03 4456 22
2 2016-01-15 1234 45

DF2 包含以下内容:

id_no start_date amount days
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19

我需要组合这两个数据帧,如果 DF2 的“id_no”和“开始日期”与 DF1 匹配,则应在 DF1 中替换它,如果不匹配,则应将其插入 DF1。 “id_no”不是唯一的。

预期结果:

id_no start_date amount days
1 2016-01-01 8650 52
2 2016-01-02 7130 65
1 2016-01-03 4456 22
2 2016-01-15 1234 45
1 2016-01-06 3456 20
2 2016-01-20 2345 19
3 2016-02-02 1345 19

【问题讨论】:

    标签: python pyspark apache-spark-sql pyspark-dataframes upsert


    【解决方案1】:

    您可以将id_nostart_date 上的两个数据框连接起来,然后将coalesce amountdays 列与df2 的列放在一起:

    import pyspark.sql.functions as f
    
    df1.alias('a').join(
        df2.alias('b'), ['id_no', 'start_date'], how='outer'
    ).select('id_no', 'start_date', 
        f.coalesce('b.amount', 'a.amount').alias('amount'), 
        f.coalesce('b.days', 'a.days').alias('days')
    ).show()
    
    +-----+----------+------+----+
    |id_no|start_date|amount|days|
    +-----+----------+------+----+
    |    1|2016-01-06|  3456|  20|
    |    2|2016-01-20|  2345|  19|
    |    1|2016-01-03|  4456|  22|
    |    3|2016-02-02|  1345|  19|
    |    2|2016-01-15|  1234|  45|
    |    1|2016-01-01|  8650|  52|
    |    2|2016-01-02|  7130|  65|
    +-----+----------+------+----+
    

    如果您有更多列:

    cols = ['amount', 'days']
    
    df1.alias('a').join(
        df2.alias('b'), ['id_no', 'start_date'], how='outer'
    ).select('id_no', 'start_date', 
        *(f.coalesce('b.' + col, 'a.' + col).alias(col) for col in cols)
    ).show()
    +-----+----------+------+----+
    |id_no|start_date|amount|days|
    +-----+----------+------+----+
    |    1|2016-01-06|  3456|  20|
    |    2|2016-01-20|  2345|  19|
    |    1|2016-01-03|  4456|  22|
    |    3|2016-02-02|  1345|  19|
    |    2|2016-01-15|  1234|  45|
    |    1|2016-01-01|  8650|  52|
    |    2|2016-01-02|  7130|  65|
    +-----+----------+------+----+
    

    【讨论】:

    • 感谢您的回答。如果数据框包含 25 个以上的列怎么办?我应该对所有 25 列使用合并吗?
    • 您可以通过循环列列表coalesce df2df1 以编程方式执行此操作,并在select 中使用* 语法。
    【解决方案2】:

    union 如果两个 df 的结构相同,则应该这样做。

    from pyspark.sql import functions as F
    grp_by = {'id_no', 'start_date'}
    df = df2.union(df1)
    df = df.groupby(*grp_by).agg(*[F.first(c).alias(c) for c in set(df.columns)-grp_by])
    df.show()
    #     +-----+----------+----+------+
    #     |id_no|start_date|days|amount|
    #     +-----+----------+----+------+
    #     |    1|2016-01-06|  20|  3456|
    #     |    2|2016-01-20|  19|  2345|
    #     |    1|2016-01-03|  22|  4456|
    #     |    3|2016-02-02|  19|  1345|
    #     |    2|2016-01-15|  45|  1234|
    #     |    1|2016-01-01|  52|  8650|
    #     |    2|2016-01-02|  65|  7130|
    #     +-----+----------+----+------+
    

    【讨论】:

      【解决方案3】:

      我现在正在自己研究这个。看起来像 sparksupports SQL's MERGE INTO 应该适合这项任务。您只需要创建一个 new_id,它是 id_no 和 start_date 的连接

      MERGE INTO df1
      USING df2
      ON df1.new_id = df2.new_id
      WHEN MATCHED THEN
        UPDATE SET df1.amount = df2.amount, df1.days = df2.days
      WHEN NOT MATCHED
        THEN INSERT *
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-07-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多