【问题标题】:Spark SQL data storage life cycleSpark SQL 数据存储生命周期
【发布时间】:2021-12-17 22:12:30
【问题描述】:

recently had a issue with with one of my spark jobs,在那里我正在读取一个包含数十亿条记录的配置单元表,由于磁盘利用率高导致作业失败,但是在添加 AWS EBS 卷后,作业运行没有任何问题。虽然它解决了这个问题,但我几乎没有疑问,我尝试进行一些研究但找不到任何明确的答案。所以我的问题是?

当 spark SQL 读取 hive 表时,如果我没有明确指定任何内容,那么数据最初存储在哪里进行处理,就其存储而言,数据的整个生命周期是什么?以及添加 EBS 卷如何解决这个问题?

【问题讨论】:

    标签: apache-spark pyspark hive apache-spark-sql


    【解决方案1】:

    Spark 会读取数据,如果内存放不下,就会溢出到磁盘上。

    需要注意的几点:

    • 内存中的数据被压缩了,从我读到的数据,你获得了大约 20%(例如,一个 100MB 的文件只需要 80MB 的内存)。
    • 只要您read(),摄取就会开始,它不是 DAG 的一部分,您可以在 SQL 查询本身中限制摄取量。读操作由执行者完成。这个例子应该给你一个提示:https://github.com/jgperrin/net.jgp.books.spark.ch08/blob/master/src/main/java/net/jgp/books/spark/ch08/lab300_advanced_queries/MySQLWithWhereClauseToDatasetApp.java
    • 在最新版本的 Spark 中,您可以下推过滤器(例如,如果您在摄取后立即过滤,Spark 会知道并优化摄取),我认为这仅适用于 CSV、Avro 和 Parquet。对于数据库(包括 Hive),我建议使用前面的示例。
    • 必须从执行程序看到/访问存储,因此如果您有 EBS 卷,请确保从运行执行程序/工作程序的集群而不是运行驱动程序的节点看到/访问它们。李>

    【讨论】:

    • 当你说内存和磁盘时,是驱动程序(内存和本地存储)还是执行程序?以及添加 EBS 卷如何解决这些问题?是否相当于 AWS 中的本地存储?
    • Executor - 除非您的代码包含 collect() 或类似函数。 EBS 只是您的服务器/集群可用的磁盘。您可能需要更多“本地”存储空间。
    • 在 EBS 上添加了一个额外的要点,嗨。
    【解决方案2】:

    最初数据位于 HDFS/S3/etc 中的表位置。如果内存容量不足,Spark 会将数据溢出到本地存储。

    阅读Apache Spark FAQ

    Does my data need to fit in memory to use Spark?
    

    没有。如果内存不适合 Spark 的操作员将数据溢出到磁盘, 允许它在任何大小的数据上运行良好。同样,缓存的数据集 不适合内存的要么溢出到磁盘要么重新计算 根据 RDD 的存储级别,在需要时执行此操作。

    【讨论】:

    • 当您说内存和本地存储时,是驱动程序(内存和本地存储)还是执行程序?以及添加 EBS 卷如何解决这些问题?是否相当于 AWS 中的本地存储?
    • 编辑问题以包括 AES EBS 卷 aws.amazon.com/premiumsupport/knowledge-center/…
    • Executors 本地内存和存储。 EBS 卷是附加到节点的存储,尽管 EBS 类似于网络附加驱动器。您可能也可以解决添加实例存储的问题。这里解释的区别:medium.com/awesome-cloud/…
    【解决方案3】:

    每当 spark 从 hive 表中读取数据时,它都会将其存储在 RDD 中。这里我要明确一点,hive 只是一个仓库,所以它就像 HDFS 之上的一层,当 spark 与 hive 交互时,hive 为 spark 提供了 hdfs 位置存在的位置。

    因此,Spark 从 HDFS 读取文件,它为单个输入拆分创建单个分区。输入拆分由 Hadoop 设置(无论用于读取此文件的 InputFormat 是什么。例如:如果您使用 textFile() 它将是 Hadoop 中的 TextInputFormat,这将为您返回单个 HDFS 块的单个分区(注意:拆分分区之间将在行拆分上完成,而不是确切的块拆分),除非您有像 Avro/parquet 这样的压缩文件格式。

    如果您手动添加 rdd.repartition(x),它会将 rdd 中的 N 个分区中的数据洗牌到您想要的 x 个分区,分区将在循环的基础上完成。

    如果您有一个 10GB 的未压缩文本文件存储在 HDFS 上,那么使用默认的 HDFS 块大小设置 (256MB),它将存储在 40 个块中,这意味着您从该文件中读取的 RDD 将具有 40 个分区。当您调用 repartition(1000) 时,您的 RDD 将被标记为重新分区,但实际上只有当您在此 RDD 之上执行操作时,它才会被打乱到 1000 个分区(延迟执行概念)

    现在一切都取决于 Spark 将如何处理数据,因为 Spark 正在进行惰性评估,在进行处理之前,Spark 准备一个 DAG 以进行最佳处理。还有一点 spark 需要配置驱动内存,没有核心,没有执行器等,如果配置不合适,作业将失败。

    一旦准备好 DAG ,它就会开始处理数据。因此,它将您的工作划分为阶段,将阶段划分为任务。每个任务将进一步使用特定的执行器, shuffle ,分区。因此,在您处理数十亿条记录的情况下,您的配置可能不足以进行处理。还有一点,当我们说 spark 在 RDD/Dataframe 中加载数据时,它由 spark 管理,可以选择仅将数据保留在内存/磁盘/内存等参考 -storage_spark

    简单地说,

    Hive-->HDFS--->SPARK>>RDD(存储取决于它的惰性评估)。

    您可以参考以下链接:Spark RDD - is partition(s) always in RAM?

    【讨论】:

      猜你喜欢
      • 2020-04-27
      • 2020-07-30
      • 1970-01-01
      • 1970-01-01
      • 2016-03-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多