【发布时间】:2022-09-23 22:05:46
【问题描述】:
我正在使用 Databricks Delta Live Tables,但是在上游插入一些表时遇到了一些问题。我知道下面的文字很长,但我试图尽可能清楚地描述我的问题。如果某些部分不清楚,请告诉我。
我有以下表格和流程:
Landing_zone -> 这是一个添加 JSON 文件的文件夹,其中包含插入或更新记录的数据。 Raw_table -> 这是 JSON 文件中的数据,但采用表格格式。此表为增量格式。没有进行任何转换,除了将 JSON 结构转换为表格结构(我做了一个分解,然后从 JSON 键创建列)。 Intermediate_table -> 这是 raw_table,但有一些额外的列(取决于其他列值)。
要从我的着陆区转到原始表,我有以下 Pyspark 代码:
cloudfile = {\"cloudFiles.format\":\"JSON\",
\"cloudFiles.schemaLocation\": sourceschemalocation,
\"cloudFiles.inferColumnTypes\": True}
@dlt.view(\'landing_view\')
def inc_view():
df = (spark
.readStream
.format(\'cloudFiles\')
.options(**cloudFilesOptions)
.load(filpath_to_landing)
<Some transformations to go from JSON to tabular (explode, ...)>
return df
dlt.create_target_table(\'raw_table\',
table_properties = {\'delta.enableChangeDataFeed\': \'true\'})
dlt.apply_changes(target=\'raw_table\',
source=\'landing_view\',
keys=[\'id\'],
sequence_by=\'updated_at\')
此代码按预期工作。我运行它,将 changes.JSON 文件添加到着陆区,重新运行管道,并且 upserts 正确应用于 \'raw_table\'
(但是,每次在 delta 文件夹中创建一个包含所有数据的新 parquet 文件时,我希望只添加一个包含插入和更新行的 parquet 文件?并且有关当前版本的一些信息保存在 delta日志?不确定这是否与我的问题有关。我已经将 \'raw_table\' 的 table_properties 更改为 enableChangeDataFeed = true。\'intermediate_table\' 的 readStream 然后有选项(readChangeFeed,\'true\')) .
然后我有以下代码从我的 \'raw_table\' 到我的 \'intermediate_table\':
@dlt.table(name=\'V_raw_table\', table_properties={delta.enableChangeDataFeed\': \'True\'})
def raw_table():
df = (spark.readStream
.format(\'delta\')
.option(\'readChangeFeed\', \'true\')
.table(\'LIVE.raw_table\'))
df = df.withColumn(\'ExtraCol\', <Transformation>)
return df
ezeg
dlt.create_target_table(\'intermediate_table\')
dlt.apply_changes(target=\'intermediate_table\',
source=\'V_raw_table\',
keys=[\'id\'],
sequence_by=\'updated_at\')
不幸的是,当我运行它时,我得到了错误: \'在版本 2 的源表中检测到数据更新(例如 part-00000-7127bd29-6820-406c-a5a1-e76fc7126150-c000.snappy.parquet)。目前不支持。如果您想忽略更新,请将选项 \'ignoreChanges\' 设置为 \'true\'。如果您希望反映数据更新,请使用新的检查点目录重新启动此查询。\'
我签入了\'ignoreChanges\',但不要认为这是我想要的。我希望自动加载器能够检测到增量表中的更改并将它们传递给流程。
我知道 readStream 只适用于追加,但这就是为什么我希望在更新 \'raw_table\' 后,一个新的镶木地板文件将被添加到仅包含插入和更新的 delta 文件夹中。然后,自动加载器会检测到这个添加的 parquet 文件,并可用于将更改应用到 \'intermediate_table\'。
我这样做是错误的吗?还是我忽略了什么?提前致谢!
-
当您运行管道时,您在哪个阶段看到错误?如果您的 LIVE.raw_table 有重大更新,即需要重写现有数据文件,自动加载器可能无法工作。
-
我们有同样的问题——最好弄清楚这个用例的适当处理。
标签: databricks delta delta-live-tables databricks-autoloader