【问题标题】:How to efficiently read/parse loads of .gz files in a s3 folder with spark on EMR如何在 EMR 上使用 spark 有效地读取/解析 s3 文件夹中的大量 .gz 文件
【发布时间】:2019-09-05 15:03:11
【问题描述】:

我正在尝试通过在 EMR 上执行的 spark 应用读取 s3 目录中的所有文件。

数据以典型格式存储,例如“s3a://Some/path/yyyy/mm/dd/hh/blah.gz”

如果我使用深度嵌套的通配符(例如“s3a://SomeBucket/SomeFolder/////*.gz”),性能很差,需要大约需要 40 分钟来读取几万个 gzip 压缩的 json 小文件。 它可以工作,但是浪费 40 分钟来测试一些代码真的很糟糕。

我的研究表明,我还有另外两种方法更有效。

使用 hadoop.fs 库 (2.8.5) 我尝试读取我提供的每个文件路径。

private def getEventDataHadoop(
    eventsFilePaths: RDD[String]
  )(implicit sqlContext: SQLContext): Try[RDD[String]] =
    Try(
      {
        val conf = sqlContext.sparkContext.hadoopConfiguration

        eventsFilePaths.map(eventsFilePath => {
          val p                            = new Path(eventsFilePath)
          val fs                           = p.getFileSystem(conf)
          val eventData: FSDataInputStream = fs.open(p)
          IOUtils.toString(eventData)
        })
      }
    )

这些文件路径由以下代码生成:

private[disneystreaming] def generateInputBucketPaths(
    s3Protocol: String,
    bucketName: String,
    service: String,
    region: String,
    yearsMonths: Map[String, Set[String]]
  ): Try[Set[String]] =
    Try(
      {
        val days                         = 1 to 31
        val hours                        = 0 to 23
        val dateFormatter: Int => String = buildDateFormat("00")

        yearsMonths.flatMap { yearMonth: (String, Set[String]) =>
          for {
            month: String <- yearMonth._2
            day: Int      <- days
            hour: Int     <- hours
          } yield
            s"$s3Protocol$bucketName/$service/$region/${dateFormatter(yearMonth._1.toInt)}/${dateFormatter(month.toInt)}/" +
              s"${dateFormatter(day)}/${dateFormatter(hour)}/*.gz"
        }.toSet
      }
    )

hadoop.fs 代码失败,因为 Path 类不可序列化。我想不出我该如何解决这个问题。

所以这导致我使用 AmazonS3Client 的另一种方法,我只是要求客户端给我一个文件夹(或前缀)中的所有文件路径,然后将文件解析为一个字符串,这可能会因为它们而失败压缩:

 private def getEventDataS3(bucketName: String, prefix: String)(
    implicit sqlContext: SQLContext
  ): Try[RDD[String]] =
    Try(
      {
        import com.amazonaws.services.s3._, model._
        import scala.collection.JavaConverters._

        val request = new ListObjectsRequest()
        request.setBucketName(bucketName)
        request.setPrefix(prefix)
        request.setMaxKeys(Integer.MAX_VALUE)
        val s3 = new AmazonS3Client(new ProfileCredentialsProvider("default"))

        val objs: ObjectListing = s3.listObjects(request) // Note that this method returns truncated data if longer than the "pageLength" above. You might need to deal with that.
        sqlContext.sparkContext
          .parallelize(objs.getObjectSummaries.asScala.map(_.getKey).toList)
          .flatMap { key =>
            Source
              .fromInputStream(s3.getObject(bucketName, key).getObjectContent: InputStream)
              .getLines()
          }
      }
    )

此代码产生空异常,因为配置文件不能为空(“java.lang.IllegalArgumentException:配置文件不能为空”)。 请记住,此代码在 AWS 内的 EMR 上运行,那么如何提供所需的凭证?其他人如何使用此客户端在 EMR 上运行 spark 作业?

非常感谢任何有关使这些方法发挥作用的帮助。

【问题讨论】:

  • 尝试AmazonS3ClientBuilder.defaultClient()初始化S3连接。

标签: scala apache-spark hadoop amazon-s3 amazon-emr


【解决方案1】:

Path 在以后的 Hadoop 版本中是可序列化的,因为能够在 Spark RDD 中使用它很有用。在那之前,将路径转换为 ​​URI,编组它,并在闭包中从该 URI 创建一个新路径。

【讨论】:

    猜你喜欢
    • 2019-10-14
    • 1970-01-01
    • 2018-03-19
    • 2020-09-29
    • 2014-08-02
    • 2017-03-16
    • 2016-09-16
    • 2016-05-05
    • 1970-01-01
    相关资源
    最近更新 更多