【问题标题】:What is the best practice writing massive amount of files to s3 using Spark使用 Spark 将大量文件写入 s3 的最佳实践是什么
【发布时间】:2020-07-18 17:56:30
【问题描述】:

我正在尝试使用 Spark 将大约 30k-60k parquet 文件写入 s3,由于 s3 速率限制,这需要大量时间(40 多分钟)。 我想知道是否有最佳实践来做这样的事情。我听说将数据写入 HDFS,然后使用 s3-dist-cp 进行复制可能会更快。我不明白为什么。由于 s3 速率限制,来自 HDFS 的副本不会花费相同的时间吗?

感谢您的帮助

【问题讨论】:

    标签: apache-spark amazon-s3 hdfs


    【解决方案1】:

    这种方法没有任何问题,并且在大多数用例中都可以正常工作,但由于 S3 文件的编写方式,可能存在一些挑战。

    需要理解的两个重要概念

    1. S3(对象存储)!= POSIX 文件系统:重命名操作:

      基于 POSIX 的文件系统中的文件重命名过程是仅元数据操作。只有指针更改,文件在磁盘上保持原样。例如,我有一个文件 abc.txt,我想将它重命名为 xyz.txt,它是瞬时的和原子的。 xyz.txt 的最后修改时间戳与 abc.txt 的最后修改时间戳相同。 在 AWS S3(对象存储)中,文件重命名是一个副本,然后是一个删除操作。源文件首先被复制到目标,然后源文件被删除。所以“aws s3 mv”改变了目标文件的最后修改时间戳,不像POSIX文件系统。这里的元数据是一个键值存储,其中key是文件路径和value 是文件的内容,没有更改密钥并立即完成此操作的过程。重命名过程取决于文件的大小。如果有一个目录重命名(为简单起见,S3 中没有任何称为目录的内容,我们可以假设一组文件作为目录),那么它取决于目录中的文件数以及每个文件的大小。所以简而言之,与普通文件系统相比,S3 中的重命名操作非常昂贵。

    2. S3 一致性模型

      S3 有 2 种一致性 a.read after write b.最终一致性,在某些情况下会导致文件未找到期望。文件被添加但未列出,文件被删除或未从列表中删除。

    Deep explanation:

    Spark 利用 Hadoop 的“FileOutputCommitter”实现来写入数据。再次写入数据涉及多个步骤,并且在高级别暂存输出文件然后提交它们,即写入最终文件。这里涉及重命名步骤,正如我之前所说的从暂存到最后一步。如您所知,火花作业分为多个阶段和任务集,并且由于分布式计算的性质,任务容易失败,因此由于系统故障或缓慢运行任务的推测执行,还可以重新启动相同的任务,这导致了任务提交和作业的概念提交函数。这里我们有 2 个现成的算法选项,以及如何完成作业和任务提交,并且说这不是一种算法比其他算法更好,而是基于我们提交数据的位置。

    mapreduce.fileoutputcommitter.algorithm.version=1

    • commitTask将task生成的数据从task临时目录重命名为job临时目录。

    • 当所有任务完成后,commitJob 将所有数据从作业临时目录重命名为最终目的地,最后创建 _SUCCESS 文件。

    这里驱动程序在最后完成commitJob的工作,因此像S3这样的对象存储可能需要更长的时间,因为大量任务临时文件排队等待重命名操作(虽然它不是串行的)并且写入性能没有优化。它可能对 HDFS 工作得很好,因为重命名并不昂贵,而且只是元数据更改。对于 AWS S3,在 commitJob 期间,文件的每个重命名操作都会打开对 AWS S3 的大量 API 调用,如果文件很高。也可能不会。我已经在两个不同的时间看到了同一个工作的两个案例。

    mapreduce.fileoutputcommitter.algorithm.version=2

    • commitTask 在任务完成后立即将任务生成的数据从任务临时目录直接移动到最终目的地。

    • commitJob 基本上是写 _SUCCESS 文件,做的不多。

    从高层次上看,这看起来是经过优化的,但它有一个限制,即不能执行推测性任务,而且如果任何任务由于数据损坏而失败,那么我们最终可能会在最终目的地留下残留数据并需要清理.所以这个算法不能提供 100% 的数据正确性,或者不适用于我们需要以追加模式将数据附加到现有文件的用例。即使这样确保优化的结果也有风险。性能良好的原因基本上是因为与算法 1 相比,重命名操作的数量更少(仍然有重命名)。在这里我们可能会遇到文件未找到预期的问题,因为 commitTask 将文件写入临时路径并立即重命名它们,最终一致性问题的可能性很小。

    最佳实践

    以下是我认为我们可以在编写 spark 数据处理应用程序时使用的几个:

    • 如果您有可用的 HDFS 集群,则将数据从 Spark 写入 HDFS 并将其复制到 S3 以持久保存。 s3-dist-cp 可以最佳地用于从 HDFS 到 S3 的数据复制。在这里我们可以避免所有重命名操作。AWS EMR 仅在计算期间运行,然后终止以保持结果,这种方法看起来更可取。

    • 尽量避免写入文件并一次又一次地读取文件,除非文件有消费者,spark 以内存处理而闻名,仔细的数据持久性/内存缓存将有助于优化运行时间应用程序。

    【讨论】:

    • 感谢 Vijay,关于 HDFS,使用 s3-dist-cp 会不会导致重命名操作?数据如何安全复制?如果副本在过程中被中止会发生什么?它是否比使用 HDFS 的 Directory Committer 更好?
    猜你喜欢
    • 2020-09-01
    • 2017-03-30
    • 1970-01-01
    • 2019-11-20
    • 2011-03-17
    • 1970-01-01
    • 2014-03-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多