【发布时间】:2020-06-19 01:32:09
【问题描述】:
我正在通过 Spark 结构化流向 delta Lake 写入流。每个流式批处理都包含键值(还包含时间戳作为一列)。 delta Lake 不支持在源(蒸汽批处理)处使用多个相同的键进行更新,所以我想只用最新时间戳的记录来更新 delta Lake。我该怎么做?
这是我正在尝试的代码 sn-p:
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
println(s"Executing batch $batchId ...")
microBatchOutputDF.show()
deltaTable.as("t")
.merge(
microBatchOutputDF.as("s"),
"s.key = t.key")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
}
提前致谢。
【问题讨论】:
标签: apache-spark spark-streaming delta-lake