【问题标题】:Incremental batch processing in pysparkpyspark中的增量批处理
【发布时间】:2020-09-04 19:59:22
【问题描述】:

在我们的 spark 应用程序中,我们每天都在运行多个批处理。这些批处理的来源不同,例如 Oracle、mongoDB、Files。我们根据源存储不同的值用于增量处理,例如某些 oracle 表的最新时间戳、某些 oracle 表的 ID、某些文件系统的列表,并将这些值用于下一次增量运行。

目前这些偏移值的计算是依赖于源类型的,我们需要在每次添加新的源类型时自定义代码来存储这个值。 是否有任何通用方法可以解决此问题,例如流式传输中的检查点。

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    我总是喜欢查看最后写入分区的目标,或者获取一些 max(primary_key),然后根据该值从源数据库中选择数据以在当前运行期间写入。

    无需存储任何内容,您只需向批处理算法提供表名、源类型和主键/时间戳列。然后该算法将找到您已经拥有的最新值。

    这真的取决于您的负载理念以及您的存储空间是如何划分的;如果您有原始/源/准备层。以原始格式加载数据是一个好主意,该原始格式可以轻松地与原始源进行比较,以便执行我上面描述的操作。

    替代方案包括:

    • 编写一个包含该主列和最新值的文件,您的批处理作业将读取此文件以确定接下来要读取的内容。
    • 使用与最新值对应的参数更新作业执行配置,以便在下次运行时将最新值传递给您的算法。

    【讨论】:

      猜你喜欢
      • 2019-09-22
      • 2019-08-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2010-12-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多