【问题标题】:Spark Streaming appends to S3 as Parquet format, too many small partitionsSpark Streaming 以 Parquet 格式附加到 S3,小分区太多
【发布时间】:2017-04-30 13:01:19
【问题描述】:

我正在构建一个应用程序,它使用 Spark Streaming 从 AWS EMR 上的 Kinesis 流接收数据。目标之一是将数据持久保存到 S3 (EMRFS) 中,为此我使用了 2 分钟的非重叠窗口。

我的方法:

Kinesis Stream -> 批处理持续时间约为 60 秒的 Spark Streaming,使用 120 秒的非重叠窗口,将流数据保存到 S3 中:

val rdd1 = kinesisStream.map( rdd => /* decode the data */)
rdd1.window(Seconds(120), Seconds(120).foreachRDD { rdd =>
        val spark = SparkSession...
        import spark.implicits._
        // convert rdd to df
        val df = rdd.toDF(columnNames: _*)
        df.write.parquet("s3://bucket/20161211.parquet")
}

这是 s3://bucket/20161211.parquet 一段时间后的样子:

如您所见,许多碎片化的小分区(这对于读取性能来说是可怕的)......问题是,当我将数据流式传输到这个 S3 parquet 文件中时,有没有办法控制小分区的数量?

谢谢

我想做的就是每天做这样的事情:

val df = spark.read.parquet("s3://bucket/20161211.parquet")
df.coalesce(4).write.parquet("s3://bucket/20161211_4parition.parquet")

我将数据帧重新分区为 4 个分区并将它们保存回来......

有效,我觉得每天都这样做不是优雅的解决方案......

【问题讨论】:

  • 您有平面数据架构吗?或者你如何确保每个 parquet 文件的架构保持不变?
  • @V.Samma Spark DF parquet read 有一个选项“mergeSchema”(默认为 false),可以帮助您管理不断发展的模式吗?
  • 是的,但是当同一列具有不同类型时,它不处理这种情况。例如,一列应该是双精度类型,一个数据行的值为“2.0”,其他数据行的值为“0”。我不知道它如何与 Spark 流配合使用,但是当 Spark 的 read.json 读取此列中的所有值均为“0”的文件时,它会推断其类型只要不加倍,并且在写入镶木地板后,mergeSchema 会抛出一个例外,因为它不知道如何处理这种情况。
  • @V.Samma 实际上,不同类型的相同列名无法合并。我想不出一个好的方法来处理这个问题。我的想法是不要在系统稳定后直接更改类型,而是添加一个具有不同类型的新列。

标签: apache-spark amazon-s3 streaming parquet


【解决方案1】:

这实际上与您想要做的非常接近,每个分区将在 Spark 中作为单独的文件写出。但是coalesce 有点令人困惑,因为它可以(有效地)应用到调用合并的上游。来自 Scala 文档的警告是:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
this may result in your computation taking place on fewer nodes than
you like (e.g. one node in the case of numPartitions = 1). To avoid this,
you can pass shuffle = true. This will add a shuffle step, but means the
current upstream partitions will be executed in parallel (per whatever
the current partitioning is).

在数据集中,persistcount 更容易进行广泛的评估,因为默认的 coalesce 函数不将 repartition 作为输入标志(尽管您可以构造 @987654327 的实例@手动)。

另一种选择是使用第二个定期批处理作业(甚至是第二个流作业)来清理/合并结果,但这可能有点复杂,因为它引入了第二个移动部分来跟踪。

【讨论】:

  • 感谢霍尔顿的回答!我喜欢有第二个预定批处理作业的选项,这样做感觉更干净。我们有一个调度服务,所以这应该是可以管理的。
猜你喜欢
  • 2018-08-28
  • 2016-03-23
  • 2018-10-16
  • 2018-02-02
  • 1970-01-01
  • 2019-06-07
  • 1970-01-01
  • 2018-09-26
  • 2016-04-28
相关资源
最近更新 更多