【发布时间】:2018-03-19 23:36:16
【问题描述】:
我在 S3 中存在大约 15000 个文件 (ORC),其中每个文件包含几分钟的数据,每个文件的大小在 300-700MB 之间变化。
由于递归循环遍历以 YYYY/MM/DD/HH24/MIN 格式存在的目录很昂贵,因此我正在创建一个文件,其中包含给定日期 (objects_list.txt) 的所有 S3 文件的列表并将此文件传递为火花读取 API 的输入
val file_list = scala.io.Source.fromInputStream(getClass.getResourceAsStream("/objects_list.txt"))
val paths: mutable.Set[String] = mutable.Set[String]()
for (line <- file_list.getLines()) {
if(line.length > 0 && line.contains("part"))
paths.add(line.trim)
}
val eventsDF = spark.read.format("orc").option("spark.sql.orc.filterPushdown","true").load(paths.toSeq: _*)
eventsDF.createOrReplaceTempView("events")
集群的大小是 10 个 r3.4xlarge 机器(workers)(每个节点:120GB RAM 和 16 个核心),master 是 m3.2xlarge 配置(
面临的问题是,spark read 无休止地运行,我看到只有驱动程序在工作,所有节点都没有做任何事情,我不确定为什么驱动程序要打开每个 S3 文件进行读取,因为 AFAIK spark 工作得很慢所以直到一个动作被称为阅读不应该发生,我认为它列出了每个文件并收集一些与之相关的元数据。
但是为什么只有 Driver 在工作,而所有节点都没有做任何事情,我怎样才能让这个操作在所有工作节点上并行运行?
我遇到过这些文章 https://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219 和 https://gist.github.com/snowindy/d438cb5256f9331f5eec,但这里整个文件内容被读取为 RDD,但我的用例取决于所引用的列,只有那些数据块/列应该是从 S3 获取(给定 ORC 的列访问是我的存储)。 S3 中的文件有大约 130 列,但只有 20 个字段正在使用数据框 API 进行引用和处理
Sample Log Messages:
17/10/08 18:31:15 INFO S3NativeFileSystem: Opening 's3://xxxx/flattenedDataOrc/data=eventsTable/y=2017/m=09/d=20/h=09/min=00/part-r-00199-e4ba7eee-fb98-4d4f-aecc-3f5685ff64a8.zlib.orc' for reading
17/10/08 18:31:15 INFO S3NativeFileSystem: Opening 's3://xxxx/flattenedDataOrc/data=eventsTable/y=2017/m=09/d=20/h=19/min=00/part-r-00023-5e53e661-82ec-4ff1-8f4c-8e9419b2aadc.zlib.orc' for reading
您可以在下面看到只有一个执行器在其中一个任务节点(集群模式)上运行驱动程序,而其他节点(即工人)的 CPU 为 0%,即使经过 3-4 小时处理,鉴于必须处理大量文件,情况相同
关于如何避免此问题的任何指示,即加快加载和处理速度?
【问题讨论】:
标签: hadoop apache-spark amazon-s3 emr amazon-emr