【发布时间】:2021-09-29 15:01:12
【问题描述】:
我正在尝试在 Kedro 中编写增量表。将文件格式更改为 delta 会使写入为 delta 表,模式为覆盖。
之前,原始层 (meta_reload) 中的一个节点会创建一个数据集,该数据集确定每个数据集增量加载的开始日期。每个节点都使用该原始数据集来过滤工作数据集,以应用转换逻辑并逐步写入分区拼花表。
但是现在将 delta 写入模式为覆盖,仅将文件类型更改为 delta 会使当前增量数据覆盖所有过去的数据,而不仅仅是那些分区。所以我需要在目录的 save_args 中使用 replaceWhere 选项。 当我需要读取 meta_reload 原始数据集以确定日期时,如何确定目录中 replaceWhere 的开始日期。 有没有办法从节点内部动态传递 save_args?
my_dataset:
type: my_project.io.pyspark.SparkDataSet
filepath: "s3://${bucket_de_pipeline}/${data_environment_project}/${data_environment_intermediate}/my_dataset/"
file_format: delta
layer: intermediate
save_args:
mode: "overwrite"
replaceWhere: "DATE_ID > xyz" ## what I want to implement dynamically
partitionBy: [ "DATE_ID" ]
【问题讨论】:
标签: databricks delta-lake delta kedro