【问题标题】:Spark EMR S3 Processing Large No of FilesSpark EMR S3 处理大量文件
【发布时间】:2018-03-19 23:36:16
【问题描述】:

我在 S3 中存在大约 15000 个文件 (ORC),其中每个文件包含几分钟的数据,每个文件的大小在 300-700MB 之间变化。

由于递归循环遍历以 YYYY/MM/DD/HH24/MIN 格式存在的目录很昂贵,因此我正在创建一个文件,其中包含给定日期 (objects_list.txt) 的所有 S3 文件的列表并将此文件传递为火花读取 API 的输入

val file_list = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/objects_list.txt"))
val paths: mutable.Set[String] = mutable.Set[String]()
    for (line <- file_list.getLines()) {
      if(line.length > 0 && line.contains("part"))
        paths.add(line.trim)
    }

val eventsDF = spark.read.format("orc").option("spark.sql.orc.filterPushdown","true").load(paths.toSeq: _*)
eventsDF.createOrReplaceTempView("events")

集群的大小是 10 个 r3.4xlarge 机器(workers)(每个节点:120GB RAM 和 16 个核心),master 是 m3.2xlarge 配置(

面临的问题是,spark read 无休止地运行,我看到只有驱动程序在工作,所有节点都没有做任何事情,我不确定为什么驱动程序要打开每个 S3 文件进行读取,因为 AFAIK spark 工作得很慢所以直到一个动作被称为阅读不应该发生,我认为它列出了每个文件并收集一些与之相关的元数据。

但是为什么只有 Driver 在工作,而所有节点都没有做任何事情,我怎样才能让这个操作在所有工作节点上并行运行?

我遇到过这些文章 https://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219https://gist.github.com/snowindy/d438cb5256f9331f5eec,但这里整个文件内容被读取为 RDD,但我的用例取决于所引用的列,只有那些数据块/列应该是从 S3 获取(给定 ORC 的列访问是我的存储)。 S3 中的文件有大约 130 列,但只有 20 个字段正在使用数据框 API 进行引用和处理

Sample Log Messages:
17/10/08 18:31:15 INFO S3NativeFileSystem: Opening 's3://xxxx/flattenedDataOrc/data=eventsTable/y=2017/m=09/d=20/h=09/min=00/part-r-00199-e4ba7eee-fb98-4d4f-aecc-3f5685ff64a8.zlib.orc' for reading
17/10/08 18:31:15 INFO S3NativeFileSystem: Opening 's3://xxxx/flattenedDataOrc/data=eventsTable/y=2017/m=09/d=20/h=19/min=00/part-r-00023-5e53e661-82ec-4ff1-8f4c-8e9419b2aadc.zlib.orc' for reading

您可以在下面看到只有一个执行器在其中一个任务节点(集群模式)上运行驱动程序,而其他节点(即工人)的 CPU 为 0%,即使经过 3-4 小时处理,鉴于必须处理大量文件,情况相同

关于如何避免此问题的任何指示,即加快加载和处理速度?

【问题讨论】:

    标签: hadoop apache-spark amazon-s3 emr amazon-emr


    【解决方案1】:

    有一个基于 AWS Glue 的解决方案可以帮助您。

    您的 S3 中有很多文件已分区。但是你有基于时间戳的分区。因此,使用胶水,您可以在 S3 中使用您的对象,例如 EMR 中的“hive 表”。

    首先您需要创建一个 5.8+ 版本的 EMR,您将能够看到:

    您可以设置此检查两个选项。这将允许访问 AWS Glue 数据目录。

    之后,您需要将根文件夹添加到 AWS Glue 目录。快速的方法是使用 Glue Crawler。此工具将抓取您的数据并根据需要创建目录。

    我建议你看看here

    爬虫运行后,您可以在AWS Athena 看到目录中的表的元数据。

    在 Athena 中,您可以检查爬虫是否正确识别了您的数据。

    此解决方案将使您的 spark 接近于真正的 HDFS。由于元数据将正确地在数据目录中。而且您的应用查找“索引”所花费的时间将允许更快地运行作业。

    在这里使用它我能够改进查询,并且使用胶水处理分区要好得多。所以,试一试这可能有助于提高性能。

    【讨论】:

    • 感谢您的详细回答。但是除了 AWS Glue 之外还有其他通用解决方案吗?如果有人在 GCE 或 Azure 上运行他们的应用程序怎么办? ,我认为这是一个非常普遍的问题,人们可能正在做一些事情来摆脱这个瓶颈,有兴趣了解这个解决方案
    • 另一方面,是否有任何关于如何从 EMR 引用/连接到 Glue 目录表的参考,他们的文档没有任何示例/示例
    • 关于这个不多。如果您创建一个 EMR 集群,请检查上述两件事。您可以像使用 spark 指向 Hive 表一样指向该表。像这样使用:val myDf = spark.table("database.table"),你就有了你的数据框。
    • 刚刚检查过,我的输入数据所在的新加坡还没有 Glue,还有其他方法吗?将尝试创建 Hive 元数据,但还有其他方法吗?
    • 这很难说 :( 您可以尝试通过 Hive 创建一个指向您的 S3 存储桶的外部表。这也可以帮助您。hortonworks.github.io/hdp-aws/s3-hive/index.html
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-17
    • 2015-11-24
    • 1970-01-01
    • 2019-10-14
    • 2017-04-24
    相关资源
    最近更新 更多