我看到您在 azure 堆栈中使用数据块。我认为最可行和推荐的方法是使用数据块中的新 delta Lake 项目:
它为对象存储(如 s3 或 azure 数据湖存储)提供各种 upsert、merge 和酸事务选项。它基本上提供了数据仓库向数据湖提供的管理、安全、隔离和更新插入/合并。对于一个管道,由于其功能和灵活性,苹果实际上将其数据仓库替换为仅在 delta 数据块上运行。对于您的用例和许多其他使用 parquet 的用户,只需 将“parquet”替换为“delta”,以便使用其功能(如果您有数据块)。 Delta 基本上是 parquet 的自然演变,databricks 通过提供附加功能和开源功能做得很好。
对于您的情况,我建议您尝试 delta 中提供的 replaceWhere 选项。在进行此目标更新之前,目标表的格式必须为 delta
而不是这个:
dataset.repartition(1).write.mode('overwrite')\
.partitionBy('Year','Week').parquet('\curataed\dataset')
来自https://docs.databricks.com/delta/delta-batch.html:
'您只能在分区列'
上
选择性地覆盖匹配谓词的数据
你可以试试这个:
dataset.write.repartition(1)\
.format("delta")\
.mode("overwrite")\
.partitionBy('Year','Week')\
.option("replaceWhere", "Year == '2019' AND Week >='01' AND Week <='02'")\ #to avoid overwriting Week3
.save("\curataed\dataset")
此外,如果您希望将分区设为 1,为什么不使用 coalesce(1),因为它可以避免完全洗牌。
来自https://mungingdata.com/delta-lake/updating-partitions-with-replacewhere/:
'replaceWhere 在您必须运行计算成本高昂的算法 时特别有用,但仅限于某些分区' em>
因此,我个人认为,使用 replacewhere 手动指定覆盖将更有针对性和计算效率,而不是仅仅依靠:
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
Databricks 对 delta 表进行了优化,通过 bin 打包和 z 排序使其成为镶木地板的更快、更有效的选择(因此是自然演变):
来自链接:https://docs.databricks.com/spark/latest/spark-sql/language-manual/optimize.html
'优化匹配给定分区谓词的行子集。仅支持涉及分区键属性的过滤器。'
'将列信息放在同一组文件中。 Delta Lake 数据跳过算法使用同域性来显着减少需要读取的数据量。
您还可以查看开源项目的完整文档:https://docs.delta.io/latest/index.html
.. 我还想说我不为 databricks/delta 湖工作。我刚刚看到它们的改进和功能使我的工作受益。
更新:
问题的要点是“替换现有数据并为新数据创建新文件夹”,并以高度可扩展和有效的方式进行。
在镶木地板中使用动态分区覆盖可以完成这项工作,但是我觉得该方法的自然演变是使用增量表合并操作,这些操作基本上是为了“将 Spark DataFrames 中的数据集成到 Delta Lake”中而创建的”时间>。它们为您提供了额外的功能和优化,可以根据希望发生的方式合并您的数据,并在表上保留所有操作的日志,以便您可以在需要时回滚版本。
Delta Lake python api(用于合并):
https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaMergeBuilder
databricks 优化:https://kb.databricks.com/delta/delta-merge-into.html#discussion
使用单个合并操作,您可以指定条件合并,在这种情况下,它可以是年和周和 id 的组合,然后如果记录匹配(意味着它们存在于您的 spark 数据框和增量表中, week1 和 week2),使用 spark 数据框中的数据更新它们,并保持其他记录不变:
#you can also add additional condition if the records match, but not required
.whenMatchedUpdateAll(condition=None)
在某些情况下,如果没有匹配项,那么您可能需要插入并创建新的行和分区,您可以使用:
.whenNotMatchedInsertAll(condition=None)
您可以使用 .converttodelta 操作 https://docs.delta.io/latest/api/python/index.html#delta.tables.DeltaTable.convertToDelta 将 parquet 表转换为 delta 表,以便您可以使用 api 对其执行 delta 操作。
'您现在可以将 Parquet 表原位转换为 Delta Lake 表,而无需重写任何数据。这对于转换非常大的 Parquet 表非常有用,而将其重写为 Delta 表会很昂贵。此外,这个过程是可逆的'
您的合并案例(替换存在的数据并在不存在时创建新记录)可能如下所示:
(未测试,语法参考例子+api)
%python
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`\curataed\dataset`")
deltaTable.alias("target").merge(dataset, "target.Year= dataset.Year AND target.Week = dataset.Week") \
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
如果增量表分区正确(年、周)并且您正确使用了 whenmatched 子句,则这些操作将得到高度优化,在您的情况下可能需要几秒钟。它还为您提供一致性、原子性和数据完整性以及回滚选项。
提供的更多功能是,如果匹配成功,您可以指定要更新的列集(如果您只需要更新某些列)。您还可以启用spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true"),以便delta 使用最少的目标分区来执行合并(更新、删除、创建)。
总的来说,我认为使用这种方法是一种非常新颖且创新的方式来执行有针对性的更新,因为它可以让您更好地控制它,同时保持高效的操作。使用带有动态分区覆盖模式的 parquet 也可以正常工作,但是 delta Lake 功能为您的数据湖带来了无与伦比的数据质量。
我的建议:
我现在想说的是,对 parquet 文件使用动态分区覆盖模式来进行更新,您可以试验并尝试在一张表上使用增量合并,并使用 spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true") 和 .whenMatchedUpdateAll() 的数据块优化并比较性能两者的(你的文件很小,所以我认为这不会有很大的不同)。合并文章的 databricks 分区修剪优化于 2 月发布,因此它确实是新的,并且可能会改变开销增量合并操作的游戏规则(因为在幕后他们只是创建新文件,但分区修剪可以加快速度)
在python,scala,sql中合并示例:https://docs.databricks.com/delta/delta-update.html#merge-examples
https://databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html