【问题标题】:How to handle small file problem in spark structured streaming?如何处理 Spark 结构化流中的小文件问题?
【发布时间】:2019-10-24 17:52:37
【问题描述】:

我的项目中有一个场景,我正在使用 spark-sql-2.4.1 版本阅读 kafka 主题消息。我能够使用结构化流处理这一天。收到数据并处理后,我需要将数据保存到 hdfs 存储中的相应 parquet 文件中。

我能够存储和读取 parquet 文件,我将触发时间保持在 15 秒到 1 分钟之间。这些文件的大小非常小,因此会产生很多文件。

这些 parquet 文件需要稍后由 hive 查询读取。

所以 1)这个策略在生产环境中有效吗?还是会导致以后出现小文件问题?

2) 处理/设计这种场景(即行业标准)的最佳做法是什么?

3) 这些事情在生产环境中一般是如何处理的?

谢谢。

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-streaming parquet


    【解决方案1】:

    我们也遇到了类似的问题。经过大量的谷歌搜索,似乎普遍接受的方式是编写另一个作业,该作业经常聚合许多小文件,并将它们写入更大的合并文件中的其他地方。这就是我们现在所做的。

    顺便说一句:无论如何,您在这里可以做的事情是有限制的,因为您拥有的并行性越多,文件的数量就越多,因为每个执行程序线程都写入自己的文件。他们从不写入共享文件。这似乎是并行处理的野兽的本质。

    【讨论】:

    • 感谢您的快速回复,但是如何将这些聚合成更大的文件,有什么策略吗?你能分享一些关于如何聚合这些文件的代码 sn-p 吗?
    • 只需将它们读入并将它们写出到一个新目录中——就这么简单。请注意,书面文件还将反映我在回答中提到的并行性。
    • 再读一遍后如何partionBy?有更好的生产策略吗?我们不能按任何列,因为数据仍在流式传输中。
    • 为什么需要再次partitionBy?您读入数据以及写入数据的方式反映了它在 Spark 中的内存状态,而不是它最初在 HDFS 中的状态。
    • 我的架构是这样的 ...source --> Kafka topic-> 1. write parquet & 2. process as structure stream ---> 1. write proccessed data into parquet & 2. cassandra db ..... 在这个流程中,支持团队想要使用 Hive 即 parquet 查询数据.... .. 如果它有很多小文件,如何处理? Hive 按分区读取分区对吗?
    【解决方案2】:

    我知道这个问题太老了。我遇到了类似的问题,我使用 spark 结构化流式查询侦听器来解决这个问题。

    我的用例是从 kafka 获取数据并以年、月、日和小时分区存储在 hdfs 中。

    以下代码将获取前一小时的分区数据,应用重新分区并覆盖现有分区中的数据。

    val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
    session.streams.addListener(AppListener(config,session))
    
    class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
      override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
      }
      override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
    }
    
    object AppListener {
    
      def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
        val configs = config.kafka(config.key.get)
        if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {
    
          println(
            s"""
               |Current Timestamp     :     ${currentTs}
               |Merge Files           :     ${Processed.ts.minusHours(1)}
               |
               |""".stripMargin)
    
          val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
          val ts = Processed.ts.minusHours(1)
          val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
          val path = new Path(hdfsPath)
    
          if(fs.exists(path)) {
    
          val hdfsFiles = fs.listLocatedStatus(path)
            .filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
            .map(_.getPath).toList
    
          println(
            s"""
               |Total files in HDFS location  : ${hdfsFiles.length}
               | ${hdfsFiles.length > 1}
               |""".stripMargin)
    
          if(hdfsFiles.length > 1) {
    
              println(
                s"""
                   |Merge Small Files
                   |==============================================
                   |HDFS Path             : ${hdfsPath}
                   |Total Available files : ${hdfsFiles.length}
                   |Status                : Running
                   |
                   |""".stripMargin)
    
              val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
              df.repartition(1)
                .write
                .format(configs.writeFormat)
                .mode("overwrite")
                .save(s"/tmp${hdfsPath}")
    
              df.cache().unpersist()
    
            spark
              .read
              .format(configs.writeFormat)
              .load(s"/tmp${hdfsPath}")
              .write
              .format(configs.writeFormat)
              .mode("overwrite")
              .save(hdfsPath)
    
              Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
              println(
                s"""
                   |Merge Small Files
                   |==============================================
                   |HDFS Path             : ${hdfsPath}
                   |Total files           : ${hdfsFiles.length}
                   |Status                : Completed
                   |
                   |""".stripMargin)
            }
          }
        }
      }
      def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
    }
    
    object Processed {
      var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
    }
    
    

    有时数据很大,我使用以下逻辑将数据分成多个文件。文件大小约为 160 MB

    val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
        val dataSize = bytes.toLong
        val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt
    
        df.repartition(if(numPartitions == 0) 1 else numPartitions)
          .[...]
    
    

    编辑-1

    使用这个 - spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes 我们可以在加载到内存后获取实际数据帧的大小,例如,您可以检查下面的代码。

    scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
    df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]
    
    scala> import org.apache.commons.io.FileUtils
    import org.apache.commons.io.FileUtils
    
    scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    bytes: BigInt = 763275709
    
    scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
    res5: String = 727 MB
    
    scala> import sys.process._
    import sys.process._
    
    scala> "hdfs dfs -ls -h /tmp/srinivas/".!
    Found 2 items
    -rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
    -rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
    res6: Int = 0
    
    

    【讨论】:

    • 我们可以用它来估计Dataframe中数据的大小。请注意,有时这会给您比实际更大的尺寸。我已经更新了这个答案。请检查。
    • 我想在 pyspark 中复制以下行:- spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes。有什么建议吗?
    • 你用的是哪个版本的spark??
    • 我使用的是 spark 2.3 版本,python 2.7
    • 对于 spark 2.3 - val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats.sizeInBytes 更多详细信息请查看我的其他帖子 - *.com/questions/61338374/…
    【解决方案3】:

    这是火花流的常见问题,没有任何固定答案。 我采用了一种基于附加思想的非常规方法。 当您使用 spark 2.4.1 时,此解决方案会有所帮助。

    因此,如果在 parquet 或 orc 等列文件格式中支持追加,它会更容易,因为新数据可以追加到同一个文件中,并且在每个微批处理之后文件大小会变得越来越大。 但是,由于它不受支持,因此我采用了版本控制方法来实现这一点。在每个微批处理之后,都会生成带有版本分区的数据。 例如

    /prod/mobility/cdr_data/date=01–01–2010/version=12345/file1.parquet
    /prod/mobility/cdr_data/date=01–01–2010/version=23456/file1.parquet
    

    我们可以做的是,在每个微批次中,读取旧版本数据,将其与新的流数据合并,然后在与新版本相同的路径上再次写入。然后,删除旧版本。这样每次微批处理后,每个分区都会有一个版本和一个文件。每个分区中的文件大小将不断增长并变得更大。

    由于流数据集和静态数据集不允许合并,我们可以使用 forEachBatch sink(在 spark >=2.4.0 中可用)将流数据集转换为静态数据集。

    我已经在链接中描述了如何以最佳方式实现这一目标。你可能想看看。 https://medium.com/@kumar.rahul.nitk/solving-small-file-problem-in-spark-structured-streaming-a-versioning-approach-73a0153a0a

    【讨论】: