【问题标题】:How do I update parquet file using Spark Streaming?如何使用 Spark Streaming 更新 parquet 文件?
【发布时间】:2020-02-29 01:22:09
【问题描述】:

我正在使用火花流来制作实时数据管道。 我从 Kafka 获取实时数据并使用 Spark 处理这些数据。

但是当我使用来自 S3 的新传入数据更新 S3 上现有的 parquet 文件时,它的性能并不好。因为我必须从 S3 获取现有的分区镶木地板文件,并用来自 Kafka 的新记录替换旧记录,然后覆盖 S3 上的完整分区镶木地板文件。

因此需要很长时间,因为此表会经常更新。

您能否建议我在火花流中执行更新操作的更好方法?

提前致谢。

【问题讨论】:

    标签: apache-spark spark-streaming parquet


    【解决方案1】:

    当我们说“parquet 文件”时,我们真正的意思是一个包含多个文件的目录结构。这些文件的组织方式及其代表的内容取决于分区选项等。

    要理解的重要一点是,信息更新的单位是一个文件(来自上面描述的这个目录结构)。所以,如果你在这个目录结构中有一个 10Gb 的文件,并且你想从一个只有 4 个字节长的记录中更新一个字段......对不起......但是你必须覆盖整个10Gb 文件。

    Parquet 的设计初衷不是用作数据库,而是像类固醇上的 CSV,如果您允许我使用这种比喻的话。

    为了更新存储为 parquet 的内容,您首先需要了解您的数据,了解您的工作流程,以方便的方式对数据进行分区,然后采用某些技术。

    我选择了一篇文章,它更详细地解释了问题并解释了如何规避困难。简而言之,您将使用一个叫做 Delta Lake 的东西,它基本上是一个基于类固醇的 parquet 目录结构,它提供了许多好处。

    https://mungingdata.com/delta-lake/merge-update-upserts/

    https://delta.io

    【讨论】:

      【解决方案2】:

      Parquet 格式不允许追加,在任何情况下,如果您想在 parquet 文件中添加某些内容,您必须完全覆盖它。在您的情况下,您想更新某些字段,但最糟糕的是因为它不是数据库。

      解决方法是仅附加信息,如果需要更新记录,只需添加一行新信息并在处理过程中获取最新信息。

      【讨论】:

      • "在处理过程中取最新的"如果我要从现有镶木地板的记录中取一个,那么这将花费太多时间。如果我得到 1000 条记录,那么对于每条记录,我必须从 s3 获取一条已经存在的记录,并且需要用新记录替换。
      • 我并不是要挑选您的记录,而是在处理之前过滤它们。您可以有一个 id 或唯一的东西来标识一条记录,并且只能通过查看持久时间戳来获取每个记录的最新信息。或者如果你真的想使用更新,你应该看看像 HBase 这样的其他存储,它与 spark 流无关,而是与存储目标有关
      猜你喜欢
      • 2018-02-02
      • 2016-03-23
      • 2016-05-21
      • 2017-07-10
      • 2015-11-27
      • 2017-04-05
      • 1970-01-01
      • 2019-02-24
      • 1970-01-01
      相关资源
      最近更新 更多