【问题标题】:Spark - Reduce the no of partitions to the no of folders readSpark - 将分区数减少到读取的文件夹数
【发布时间】:2020-02-02 10:52:09
【问题描述】:

使用 spark 流式传输(每 5 分钟一次)我将数据作为 parquet 存储在 HDFS 中。

/data/yyyy-MM-dd/*.parquet

例如:/data/2020-02-02/*parquet

每个 parquet 文件的大小仅以 KB 为单位

每个文件夹最多可包含 288 个 parquet 文件(最多)。

我们通常读取过去 24 小时、过去 7 天、过去一个月等的数据。

使用 spark 读取数据时,我检查了分区数。假设我正在检查最近一个月的数据,每个文件夹中有 288 个文件,它正在创建 288 个分区。

当我尝试将数据重新分区为 30 时,它会减少到只有 180。

有什么方法可以为一个文件夹创建一个分区。

例如:当我读取过去 30 天的数据时。有什么方法可以读取 30 个分区的数据。如果是7天7分区。

【问题讨论】:

    标签: apache-spark spark-streaming partition


    【解决方案1】:

    我首选的解决方案是使用partitionBy 函数:

    import org.apache.spark.sql.functions._
    val df = spark.range(1,100).withColumn("myPartition", col("id")%10)
    // saving with one file per partition
    df.repartition(1, $"myPartition")
      .write
      .mode("append")
      .partitionBy("myPartition")
      .parquet("output/data")
    

    这应该创建

    # hadoop fs -ls output/data
    output/data/myPartition=0
    output/data/myPartition=1
    ...
    output/data/myPartition=9
    

    每个都有一个镶木地板文件。

    【讨论】:

    • @meniluca...在写入数据时,我使用了 coalesce(1)。但我每 5 分钟写入一次数据,并且我使用火花附加模式来执行此操作。所以我每 5 分钟创建一次新文件
    • 您必须首先在文件夹名称中写入分区值,如上例所示。在你的情况下应该是/data/date=2020-02-02/*parquet,那么你必须使用像我上面的例子中的重新分区命令df.repartition(1, $"date"),这会给你想要的结果,每天一个分区。请更正您的反馈。
    • 不适合写作。我有问题中提到的文件夹结构。有什么办法可以用 30 个分区读取。我也没有否决你的回答
    • 好的,知道了。 HDFS 文件是不可变的。不幸的是,没有附加到同一个文件,您必须实现自己的逻辑来读取文件夹并在每次需要写入时重新分区,以便每天有 1 个文件。最好的方法是在一天结束时完成写作后压缩文件。唉,这是一个普遍的问题。你可以 +1 我会很感激 :)
    猜你喜欢
    • 2018-11-27
    • 1970-01-01
    • 2017-03-13
    • 2017-03-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多