【问题标题】:How to dynamically pass save_args to kedro catalog?如何将 save_args 动态传递给 kedro 目录?
【发布时间】:2021-09-29 15:01:12
【问题描述】:

我正在尝试在 Kedro 中编写增量表。将文件格式更改为 delta 会使写入为 delta 表,模式为覆盖。

之前,原始层 (meta_reload) 中的一个节点会创建一个数据集,该数据集确定每个数据集增量加载的开始日期。每个节点都使用该原始数据集来过滤工作数据集,以应用转换逻辑并逐步写入分区拼花表。

但是现在将 delta 写入模式为覆盖,仅将文件类型更改为 delta 会使当前增量数据覆盖所有过去的数据,而不仅仅是那些分区。所以我需要在目录的 save_args 中使用 replaceWhere 选项。 当我需要读取 meta_reload 原始数据集以确定日期时,如何确定目录中 replaceWhere 的开始日期。 有没有办法从节点内部动态传递 save_args?

my_dataset:
  type: my_project.io.pyspark.SparkDataSet
  filepath: "s3://${bucket_de_pipeline}/${data_environment_project}/${data_environment_intermediate}/my_dataset/"
  file_format: delta
  layer: intermediate
  save_args:
    mode: "overwrite"
    replaceWhere: "DATE_ID > xyz"  ## what I want to implement dynamically
    partitionBy: [ "DATE_ID" ]

【问题讨论】:

    标签: databricks delta-lake delta kedro


    【解决方案1】:

    我已经在 GH discussion 上回答了这个问题。简而言之,您需要子类化并定义自己的SparkDataSet,我们避免在 Kedro 级别更改数据集的底层 API,但我们鼓励您根据自己的目的进行更改和重新组合。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-11-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-07-12
      相关资源
      最近更新 更多