【问题标题】:Converting PySpark dataframe to a Delta Table将 PySpark 数据帧转换为 Delta 表
【发布时间】:2021-10-29 00:18:53
【问题描述】:

我在 AWS Glue 环境中工作。我从 Glue 目录中读取数据作为动态数据框,并将其转换为 Pyspark 数据框以进行自定义转换。 要对新的/更新的数据进行 upsert,我打算使用 delta 表。

但我只找到从路径中将数据作为增量表读取的选项。我需要将我的 Pyspark 数据框转换为 Delta 表以进行合并操作。 有没有办法做到这一点?

【问题讨论】:

    标签: apache-spark pyspark aws-glue delta-lake


    【解决方案1】:

    您只需要一个目标表作为 Delta 表。您计划合并到的数据不需要是 Delta 表。这实际上取决于您使用的 API:

    • 如果您使用的是 Python API,那么您可以按原样使用数据框(示例基于 docs):
    from delta.tables import *
    
    deltaTable = DeltaTable.forPath(spark, "/data/events/")
    updatesDF = .... # your transformed dataframe
    
    deltaTable.alias("target").merge(
        updatesDF.alias("updates"),
        "target.col1 = updates.col1") \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()
    
    • 如果您使用 SQL MERGE 命令 - 您只需为数据框注册临时视图,并将其用作 MER​​GE SQL 命令的输入:
    updates_df.createOrReplaceTempView(updates)
    merge_sql = f"""
          merge into target
          using updates
            
          ON source.col1 == target.col1
          WHEN MATCHED THEN UPDATE SET *
          WHEN NOT MATCHED THEN INSERT *
        """
    updates_df._jdf.sparkSession().sql(merge_sql)
    

    这里唯一的问题是您需要使用df._jdf.sparkSession().sql 在注册临时视图的同一上下文中执行 SQL 命令。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-09-18
      • 2021-11-16
      • 2017-01-25
      • 2020-07-31
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多