【问题标题】:Delta Lake rollbackDelta Lake 回滚
【发布时间】:2020-12-02 07:43:08
【问题描述】:

需要一种优雅的方式将 Delta Lake 回滚到以前的版本。

我目前的做法如下:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, testFolder)

spark.read.format("delta")
  .option("versionAsOf", 0)
  .load(testFolder)
  .write
  .mode("overwrite")
  .format("delta")
  .save(testFolder)

这很丑陋,因为需要重写整个数据集。似乎一些元更新就足够了,不需要数据 I/O。有人知道更好的方法吗?

【问题讨论】:

  • 我同意这不是一个理想的解决方案,但考虑到用分区覆盖大型数据集可能会很昂贵,这个简单的解决方案可能会有所帮助。

标签: apache-spark rollback databricks delta-lake


【解决方案1】:

从 Delta Lake 0.7.0 开始,您可以使用 RESTORE command 回滚到早期版本的 Delta Lake 表。这是使用时间旅行来回滚表格的一种更简单的方法。

斯卡拉:

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

Python:

from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/path/to/delta-table")

deltaTable.restoreToVersion(0)

SQL

RESTORE TABLE delta.`/path/to/delta-table` TO VERSION AS OF 0

如果您更愿意这样做,也可以使用restoreToTimestamp 命令。 Read the documentation了解更多详情。

【讨论】:

  • 是否需要将恢复的表保存回来?如何将其加载到 spark 数据框中?
  • 我在 api 文档中编码时找不到此方法。它仅在文档中。
【解决方案2】:

这是一个残酷的解决方案。这并不理想,但考虑到用分区覆盖大型数据集可能会很昂贵,这个简单的解决方案可能会有所帮助。

如果您对所需回滚时间之后的更新不是很敏感,只需删除 _delta_log 中晚于回滚时间的所有版本文件。未引用的文件可以稍后使用真空释放。

另一种保留完整历史记录的解决方案是 1) deltaTable.delete 2) 将所有日志依次复制到回滚(版本号增加)到删除日志文件的末尾。这模仿了在回滚日期之前创建 delta 湖的过程。但它肯定不漂亮。

【讨论】:

  • 今天与一位 Databricks 工程师交谈。他承认目前还没有一个优雅的解决方案来解决这个问题,但是,这个问题在他们的清单上是高度优先的。现在,有一些绕行,但是是的,它们很丑。
  • 你有没有“简单地删除_delta_log中所有晚于回滚时间的版本文件”的例子?
  • 进入你的delta Lake的_delta_log文件夹,你应该可以看到000...0001.json、000...0002.json等版本文件,每个都是对应于一个提交。做任何你想做的事情(保存、追加、覆盖等),你应该会看到版本号不断增加。要回滚较早的版本,例如 3,那么您可以删除所有晚于 3 的版本,例如 000...0004.json、000...0005.json 等。现在读取 delta Lake,您应该只获得以下数据第 3 版。
  • 好的,谢谢。为什么您不在旅行选项中使用 timestampAsOf?就我而言,我需要按小时、天、月或年进行回滚。如何根据时间戳知道哪个是版本?
  • 您不必检查文件中的内容。 .json 文件的时间戳就足够了。您还可以使用三角洲湖的“历史”功能来获得更好的视觉展示。
【解决方案3】:

如果您的目标是修复错误数据并且您对更新不是很敏感,则可以更换一个时间间隔。

 df.write
      .format("delta")
      .mode("overwrite")
      .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'")
      .save("/delta/events")

【讨论】:

    【解决方案4】:

    你应该使用时间旅行功能:https://databricks.com/blog/2019/02/04/introducing-delta-time-travel-for-large-scale-data-lakes.html

    您在时间戳上读取数据:

    val inputPath = "/path/to/my/table@20190101000000000"
    

    然后用“回滚”版本覆盖现有数据。

    关于它的丑陋,我不确定我能帮上什么忙。您可以使用分区来限制数据。或者您可以计算出哪些记录已更改并仅覆盖它们。

    【讨论】:

    • 版本是时间旅行的。它们本质上是一样的。
    • 使用精确的时间戳并不总是很方便;我改用了相应的版本号。
    • 我同意两个 cmets。只是说我认为这是正确的方法(而不是弄乱日志)。
    • 我会说时间旅行 + 覆盖是使用可用 API 的合理方式,但是,它不是优化方式。有了所有可用的数据,额外的 I/O 对我来说似乎是一种浪费。应该有或可能会添加更好的 API。对于熟悉协议的人来说,添加指向正确数据的新版本文件应该是直截了当的。
    【解决方案5】:

    我在使用 Delta 时遇到过类似的问题,我在 1 个事务中调用了多个 dml 操作。例如我需要调用合并,然后在 1 个单一事务中删除。因此,在这种情况下,要么他们要么一起成功,要么在其中任何一个失败时回滚状态。

    为了解决这个问题,我在事务开始之前备份了_delta_log(我们称之为稳定状态)目录。如果事务中的两个 DML 操作都成功,则丢弃先前的稳定状态并使用 _delta_log 中提交的新状态,以防任何 dml 操作失败,则只需将 _delta_log 目录替换为您之前进行备份的稳定状态开始交易。替换为稳定状态后,只需运行 Vacuum 即可删除可能在事务期间创建的陈旧文件。

    【讨论】:

      猜你喜欢
      • 2021-01-27
      • 2022-10-13
      • 2019-10-06
      • 2021-02-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-07-14
      • 2021-09-20
      相关资源
      最近更新 更多