【问题标题】:partitionBy & overwrite strategy in an Azure DataLake using PySpark in Databricks在 Databricks 中使用 PySpark 在 Azure DataLake 中的 partitionBy 和覆盖策略
【发布时间】:2020-06-16 03:04:24
【问题描述】:

我在 Azure 环境中有一个简单的 ETL 过程

blob 存储 > datafactory > datalake raw > databricks > datalake 策划 > 数据仓库(主要 ETL)。

这个项目的数据集不是很大(大约 100 万行 20 列给或取),但是我想在我的数据湖中将它们正确分区为 Parquet 文件。

目前我运行一些简单的逻辑来根据业务日历确定每个文件在我的湖中的位置。

文件看起来像这样

Year Week Data
2019 01   XXX
2019 02   XXX

然后,我将给定文件分区为以下格式,替换现有数据并为新数据创建新文件夹。

curated ---
           dataset --
                     Year 2019 
                              - Week 01 - file.pq + metadata
                              - Week 02 - file.pq + metadata
                              - Week 03 - file.pq + datadata #(pre existing file)

元数据是自动生成的成功提交

为此,我在 Pyspark 2.4.3 中使用以下查询

pyspark_dataframe.write.mode('overwrite')\
                         .partitionBy('Year','Week').parquet('\curated\dataset')

现在如果我单独使用这个命令,它将覆盖目标分区中的所有现有数据

所以Week 03 将会丢失。

使用spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") 似乎可以解决问题并且只覆盖目标文件,但我想知道这是否是处理数据湖中文件的最佳方式?

我也发现很难找到有关上述功能的任何文档。

我的第一个直觉是循环单个拼花并手动写入每个分区,虽然这给了我更大的控制权,但循环会很慢。

我的下一个想法是将每个分区写入/tmp 文件夹并移动每个镶木地板文件,然后根据需要使用上面的查询替换文件/创建文件。然后在创建某种元数据日志的同时清除 /tmp 文件夹。

有没有更好的方法/方法?

任何指导将不胜感激。

这里的最终目标是为所有“精选”数据提供一个干净且安全的区域,同时拥有可以读入数据仓库以进行进一步 ETL 的镶木地板文件日志。

【问题讨论】:

    标签: python azure apache-spark apache-spark-sql databricks


    【解决方案1】:

    我们可以使用 saveAsTable 并在此之前删除分区,而不是直接写入表。

    dataset.repartition(1).write.mode('append')\
                         .partitionBy('Year','Week').saveAsTable("tablename")
    

    用于删除以前的分区

    partitions = [ (x["Year"], x["Week"]) for x in dataset.select("Year", "Week").distinct().collect()]
    for year, week in partitions:
        spark.sql('ALTER TABLE tablename DROP IF EXISTS PARTITION (Year = "'+year+'",Week = "'+week+'")')
    

    【讨论】:

      【解决方案2】:

      我看到您在 azure 堆栈中使用数据块。我认为最可行和推荐的方法是使用数据块中的新 delta Lake 项目

      它为对象存储(如 s3 或 azure 数据湖存储)提供各种 upsert、merge 和酸事务选项。它基本上提供了数据仓库向数据湖提供的管理、安全、隔离和更新插入/合并。对于一个管道,由于其功能和灵活性,苹果实际上将其数据仓库替换为仅在 delta 数据块上运行。对于您的用例和许多其他使用 parquet 的用户,只需 将“parquet”替换为“delta”,以便使用其功能(如果您有数据块)。 Delta 基本上是 parquet自然演变,databricks 通过提供附加功能和开源功能做得很好。

      对于您的情况,我建议您尝试 delta 中提供的 replaceWhere 选项。在进行此目标更新之前,目标表的格式必须为 delta

      而不是这个:

      dataset.repartition(1).write.mode('overwrite')\
                               .partitionBy('Year','Week').parquet('\curataed\dataset')
      

      来自https://docs.databricks.com/delta/delta-batch.html

      '您只能在分区列'

      选择性地覆盖匹配谓词的数据

      你可以试试这个:

      dataset.write.repartition(1)\
             .format("delta")\
             .mode("overwrite")\
             .partitionBy('Year','Week')\
             .option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'")\ #to avoid overwriting Week3
             .save("\curataed\dataset")
      

      此外,如果您希望将分区设为 1,为什么不使用 coalesce(1),因为它可以避免完全洗牌。

      来自https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/

      'replaceWhere 在您必须运行计算成本高昂的算法 时特别有用,但仅限于某些分区' em>

      因此,我个人认为,使用 replacewhere 手动指定覆盖将更有针对性和计算效率,而不是仅仅依靠: spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

      Databricks 对 delta 表进行了优化,通过 bin 打包和 z 排序使其成为镶木地板的更快、更有效的选择(因此是自然演变):

      来自链接:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html

      • WHERE(装箱)

      '优化匹配给定分区谓词的行子集。仅支持涉及分区键属性的过滤器。'

      • ZORDER BY

      '将列信息放在同一组文件中。 Delta Lake 数据跳过算法使用同域性来显着减少需要读取的数据量。

      • 更快的查询执行,支持索引、统计和自动缓存

      • 具有丰富架构验证的数据可靠性和事务保证

      • 简化的数据管道,具有灵活的 UPSERT 支持和单一数据源上的统一结构化流 + 批处理

      您还可以查看开源项目的完整文档:https://docs.delta.io/latest/index.html

      .. 我还想说我不为 databricks/delta 湖工作。我刚刚看到它们的改进和功能使我的工作受益。

      更新:

      问题的要点“替换现有数据并为新数据创建新文件夹”,并以高度可扩展和有效的方式进行。

      在镶木地板中使用动态分区覆盖可以完成这项工作,但是我觉得该方法的自然演变是使用增量表合并操作,这些操作基本上是为了“将 Spark DataFrames 中的数据集成到 Delta Lake”中而创建的”时间>。它们为您提供了额外的功能和优化,可以根据希望发生的方式合并您的数据,并在表上保留所有操作的日志,以便您可以在需要时回滚版本。

      Delta Lake python api(用于合并): https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder

      databricks 优化:https://kb.databricks.com/delta/delta-merge-into.html#discussion

      使用单个合并操作,您可以指定条件合并,在这种情况下,它可以是年和周和 id 的组合,然后如果记录匹配(意味着它们存在于您的 spark 数据框和增量表中, week1 和 week2),使用 spark 数据框中的数据更新它们,并保持其他记录不变:

      #you can also add additional condition if the records match, but not required
      .whenMatchedUpdateAll(condition=None)
      

      在某些情况下,如果没有匹配项,那么您可能需要插入并创建新的行和分区,您可以使用:

      .whenNotMatchedInsertAll(condition=None)
      

      您可以使用 .converttodelta 操作 https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.convertToDelta 将 parquet 表转换为 delta 表,以便您可以使用 api 对其执行 delta 操作。

      '您现在可以将 Parquet 表原位转换为 Delta Lake 表,而无需重写任何数据。这对于转换非常大的 Parquet 表非常有用,而将其重写为 Delta 表会很昂贵。此外,这个过程是可逆的'

      您的合并案例替换存在的数据并在不存在时创建新记录)可能如下所示:

      (未测试,语法参考例子+api)

      %python  
      deltaTable = DeltaTable.convertToDelta(spark, "parquet.`\curataed\dataset`")
      
      deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year  AND target.Week = dataset.Week") \
        .whenMatchedUpdateAll()\
        .whenNotMatchedInsertAll()\
        .execute()
      

      如果增量表分区正确(年、周)并且您正确使用了 whenmatched 子句,则这些操作将得到高度优化,在您的情况下可能需要几秒钟。它还为您提供一致性、原子性和数据完整性以及回滚选项。

      提供的更多功能是,如果匹配成功,您可以指定要更新的列集(如果您只需要更新某些列)。您还可以启用spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"),以便delta 使用最少的目标分区来执行合并(更新、删除、创建)。

      总的来说,我认为使用这种方法是一种非常新颖且创新的方式来执行有针对性的更新,因为它可以让您更好地控制它,同时保持高效的操作。使用带有动态分区覆盖模式的 parquet 也可以正常工作,但是 delta Lake 功能为您的数据湖带来了无与伦比的数据质量

      我的建议: 我现在想说的是,对 parquet 文件使用动态分区覆盖模式来进行更新,您可以试验并尝试在一张表上使用增量合并,并使用 spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true").whenMatchedUpdateAll() 的数据块优化并比较性能两者的(你的文件很小,所以我认为这不会有很大的不同)。合并文章的 databricks 分区修剪优化于 2 月发布,因此它确实是新的,并且可能会改变开销增量合并操作的游戏规则(因为在幕后他们只是创建新文件,但分区修剪可以加快速度)

      python,scala,sql中合并示例:https://docs.databricks.com/delta/delta-update.html#merge-examples

      https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html

      【讨论】:

      • 使用 delta replacewhere 使用字符串迭代或使用 alter table 在收集操作后删除分区然后追加都是相当棘手的解决方案,因为它们在循环中覆盖或删除分区(迭代地)而不是同时处理使用动态覆盖在镶木地板中完成多个分区。我正在研究,如果我找到一种规范的方式来实现你在 delta 中想要的东西,我会更新。
      • 我发现一个可能的解决方案可能是使用 delta 合并和 databricks 优化,是的,我的第一直觉是只使用镶木地板的动态覆盖,而 databricks delta(merge) 可能是未来可行的解决方案。这绝对是一个新的未知领域,因此我添加了我的推荐并更新了我的帖子。
      • @Datanovice waʾantum fa-jazākumu-llāhu khayran。很高兴我能帮上忙,祝你一切顺利。
      【解决方案3】:

      如果我在您的方法中遗漏了一些重要的内容,请纠正我,但您似乎想在现有数据之上写入新数据,这通常使用

      write.mode('append')
      

      而不是'overwrite'

      如果您想将数据按批次分开,以便您可以选择将其上传到数据仓库或审计,除了将这些信息包含到数据集中并在保存期间对其进行分区之外,没有任何明智的方法,例如

      dataset.write.mode('append')\
                           .partitionBy('Year','Week', 'BatchTimeStamp').parquet('curated\dataset')
      

      任何其他对 parquet 文件格式的手动干预充其量都是 hacky,最坏的风险是使您的管道不可靠或损坏您的数据。

      Mohammad 提到的 Delta Lake 总体上也是一个很好的建议,可以将数据可靠地存储在数据湖中,也是目前的黄金行业标准。对于您的特定用例,您可以使用其进行历史查询的功能(附加所有内容,然后查询当前数据集与上一批之后的差异),但是审计日志在时间上受限于您如何配置 delta Lake,并且可以低至 7 天,所以如果你想要长期完整的信息,你无论如何都需要遵循保存批次信息的方法。

      在更具战略意义的层面上,当遵循 raw -> curated -> DW 时,您还可以考虑添加另一个“跃点”并将准备好的数据放入按批次组织的“预处理”文件夹中,然后将其附加到 curated和 DW 集。

      附带说明,.repartition(1) 在使用 parquet 时没有太大意义,因为 parquet 无论如何都是多文件格式,因此这样做的唯一影响是对性能产生负面影响。但是,如果您使用它有特定原因,请告诉我。

      【讨论】:

      • 回到这个丹尼尔,这是我们决定的方法。感谢您的提示,在详细阅读了 partition(1) 之后,我们将其从我们的代码中删除了。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-05-27
      • 2018-08-28
      • 2020-08-09
      • 2021-12-19
      • 1970-01-01
      • 2020-03-23
      相关资源
      最近更新 更多