【问题标题】:Databricks Delta Live Tables - Apply Changes from delta tableDatabricks Delta Live Tables - 应用增量表中的更改
【发布时间】:2022-09-23 22:05:46
【问题描述】:

我正在使用 Databricks Delta Live Tables,但是在上游插入一些表时遇到了一些问题。我知道下面的文字很长,但我试图尽可能清楚地描述我的问题。如果某些部分不清楚,请告诉我。

我有以下表格和流程:

Landing_zone -> 这是一个添加 JSON 文件的文件夹,其中包含插入或更新记录的数据。 Raw_table -> 这是 JSON 文件中的数据,但采用表格格式。此表为增量格式。没有进行任何转换,除了将 JSON 结构转换为表格结构(我做了一个分解,然后从 JSON 键创建列)。 Intermediate_table -> 这是 raw_table,但有一些额外的列(取决于其他列值)。

要从我的着陆区转到原始表,我有以下 Pyspark 代码:

cloudfile = {\"cloudFiles.format\":\"JSON\", 
                       \"cloudFiles.schemaLocation\": sourceschemalocation, 
                       \"cloudFiles.inferColumnTypes\": True}

@dlt.view(\'landing_view\')
def inc_view():
    df = (spark
             .readStream
             .format(\'cloudFiles\')
             .options(**cloudFilesOptions)
             .load(filpath_to_landing)
     <Some transformations to go from JSON to tabular (explode, ...)>
     return df

dlt.create_target_table(\'raw_table\', 
                        table_properties = {\'delta.enableChangeDataFeed\': \'true\'})
  
dlt.apply_changes(target=\'raw_table\',
                  source=\'landing_view\',
                  keys=[\'id\'],
                  sequence_by=\'updated_at\')

此代码按预期工作。我运行它,将 changes.JSON 文件添加到着陆区,重新运行管道,并且 upserts 正确应用于 \'raw_table\'

(但是,每次在 delta 文件夹中创建一个包含所有数据的新 parquet 文件时,我希望只添加一个包含插入和更新行的 parquet 文件?并且有关当前版本的一些信息保存在 delta日志?不确定这是否与我的问题有关。我已经将 \'raw_table\' 的 table_properties 更改为 enableChangeDataFeed = true。\'intermediate_table\' 的 readStream 然后有选项(readChangeFeed,\'true\')) .

然后我有以下代码从我的 \'raw_table\' 到我的 \'intermediate_table\':

@dlt.table(name=\'V_raw_table\', table_properties={delta.enableChangeDataFeed\': \'True\'})
def raw_table():
     df = (spark.readStream
                .format(\'delta\')
                .option(\'readChangeFeed\', \'true\')
                .table(\'LIVE.raw_table\'))
     df = df.withColumn(\'ExtraCol\', <Transformation>)
     return df
 ezeg
dlt.create_target_table(\'intermediate_table\')
dlt.apply_changes(target=\'intermediate_table\',
                  source=\'V_raw_table\',
                  keys=[\'id\'],
                  sequence_by=\'updated_at\')

不幸的是,当我运行它时,我得到了错误: \'在版本 2 的源表中检测到数据更新(例如 part-00000-7127bd29-6820-406c-a5a1-e76fc7126150-c000.snappy.parquet)。目前不支持。如果您想忽略更新,请将选项 \'ignoreChanges\' 设置为 \'true\'。如果您希望反映数据更新,请使用新的检查点目录重新启动此查询。\'

我签入了\'ignoreChanges\',但不要认为这是我想要的。我希望自动加载器能够检测到增量表中的更改并将它们传递给流程。

我知道 readStream 只适用于追加,但这就是为什么我希望在更新 \'raw_table\' 后,一个新的镶木地板文件将被添加到仅包含插入和更新的 delta 文件夹中。然后,自动加载器会检测到这个添加的 parquet 文件,并可用于将更改应用到 \'intermediate_table\'。

我这样做是错误的吗?还是我忽略了什么?提前致谢!

  • 当您运行管道时,您在哪个阶段看到错误?如果您的 LIVE.raw_table 有重大更新,即需要重写现有数据文件,自动加载器可能无法工作。
  • 我们有同样的问题——最好弄清楚这个用例的适当处理。

标签: databricks delta delta-live-tables databricks-autoloader


【解决方案1】:

由于 readStream 仅适用于追加,源文件中的任何更改都会在下游产生问题。对“raw_table”的更新只会插入一个新的 parquet 文件的假设是不正确的。基于“优化写入”之类的设置,甚至没有它,apply_changes 可以添加或删除文件。您可以在“numTargetFilesAdded”和“numTargetFilesRemoved”下的“raw_table/_delta_log/xxx.json”中找到此信息。

基本上,“Databricks 建议您使用 Auto Loader 仅摄取不可变文件”。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-02-09
    • 1970-01-01
    • 2022-08-02
    • 2022-11-03
    • 2022-11-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多