【问题标题】:Append new data to partitioned parquet files将新数据附加到分区 parquet 文件
【发布时间】:2016-04-28 09:15:12
【问题描述】:

我正在编写一个 ETL 过程,我需要读取每小时的日志文件、对数据进行分区并保存它。我正在使用 Spark(在 Databricks 中)。 日志文件是 CSV,所以我阅读它们并应用模式,然后执行我的转换。

我的问题是,如何将每小时的数据保存为镶木地板格式,但附加到现有数据集?保存时,我需要按数据框中存在的 4 列进行分区。

这是我的存档:

data
    .filter(validPartnerIds($"partnerID"))
    .write
    .partitionBy("partnerID","year","month","day")
    .parquet(saveDestination)

问题是如果目标文件夹存在,则保存会引发错误。 如果目标不存在,那么我不会附加我的文件。

我尝试过使用.mode("append"),但我发现 Spark 有时会在中途失败,因此我最终会丢失已写入的数据量以及仍需要写入的数据量。

我正在使用镶木地板,因为分区大大增加了我将来的查询。同样,我必须将数据以某种文件格式写入磁盘,不能使用 Druid 或 Cassandra 等数据库。

非常感谢任何有关如何对我的数据框进行分区和保存文件(坚持镶木地板或其他格式)的建议。

【问题讨论】:

  • 你能分享一下你在使用.mode(append)时遇到的错误吗?
  • 我得到的错误是这样的: 引起:java.io.IOException:文件已经存在:/tracking/v4/010316/gif=a/partnerID=111/year=2016/month=1 /day=3/part-r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet 我认为这个错误是由于一些写入操作需要很长时间时任务调度不匹配而引发的。跨度>
  • @Saman 你是如何解决这个问题的?我正在尝试在这里处理类似的情况。超过 10000 个分区,实际上甚至更多,因为每小时一个分区。我正在尝试围绕此构建解决方案,有什么建议吗?谢谢。
  • @ds_user,使用最新版本的spark(我使用的是Databricks Runtime Environment,它有很多开源spark之外的特性),你可以在writer中使用“append”模式。在解决此问题之前,您应该重新考虑导致 10k+ 分区的分区列。我不知道您拥有的数据量,但是可以按天而不是按小时进行分区吗?尝试对数据进行重新分区,以确保在写入文件后拥有 200-500MB 的文件。您是要一次写入所有 10k 个分区还是每批都是几个分区?
  • 这是我要解决的问题。 stackoverflow.com/questions/50197782/… 。我正在尝试通过列出 id(或)产品 id 进行分区。所以想要存储listingId=612/year=2018/month=3/day=5 如果我为每个listing汇总到每日分区。但是由于列表数量的增加,这仍然是太多的分区。有什么建议吗?

标签: scala apache-spark append parquet


【解决方案1】:

如果你需要追加文件,你肯定要使用追加模式。我不知道您希望它生成多少个分区,但我发现如果您有 许多 个分区,partitionBy 会导致许多问题(内存问题和 IO 问题)。

如果您认为您的问题是由于写入操作耗时过长,我建议您尝试以下两件事:

1) 通过添加到配置来使用 snappy:

conf.set("spark.sql.parquet.compression.codec", "snappy")

2) 在SparkContext 上禁用hadoopConfiguration 中的元数据文件的生成,如下所示:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

元数据文件的生成会有些耗时(请参阅this blog post),但根据this,它们实际上并不重要。就个人而言,我总是禁用它们并且没有问题。

如果您生成许多分区 (> 500),恐怕我能做的最好的就是建议您使用附加模式查看解决方案 - 我根本没有设法让partitionBy 处理那么多分区。

【讨论】:

  • 谢谢格伦尼。由于该博客文章,我总是禁用生成元数据文件:D。我肯定会创建超过 500 个分区。我相信我的大部分问题都源于 parquet 格式不打算用作可更新格式,我将其视为数据库表。您对另一种保存我的日常数据的方法有什么建议吗?
  • 我有类似的问题,我正在根据当前时间戳进行分区,每个新分区附加它创建的总任务等于到目前为止的分区。即如果有 1000 个分区和 1 个要添加的新分区,它将运行 1001 个任务并增加整体作业时间。我在这里做错了吗?
【解决方案2】:

如果您使用未排序的分区,您的数据将被拆分到所有分区中。这意味着每个任务都会生成数据并将数据写入每个输出文件。

考虑在写入之前根据分区列对数据重新分区,以使每个输出文件的所有数据都位于相同的分区上:

data
 .filter(validPartnerIds($"partnerID"))
 .repartition([optional integer,] "partnerID","year","month","day")
 .write
 .partitionBy("partnerID","year","month","day")
 .parquet(saveDestination)

见:DataFrame.repartition

【讨论】:

    猜你喜欢
    • 2023-03-07
    • 1970-01-01
    • 1970-01-01
    • 2014-07-27
    • 2017-04-30
    • 2015-04-09
    • 2018-05-14
    • 2018-08-27
    相关资源
    最近更新 更多