【发布时间】:2016-04-28 09:15:12
【问题描述】:
我正在编写一个 ETL 过程,我需要读取每小时的日志文件、对数据进行分区并保存它。我正在使用 Spark(在 Databricks 中)。 日志文件是 CSV,所以我阅读它们并应用模式,然后执行我的转换。
我的问题是,如何将每小时的数据保存为镶木地板格式,但附加到现有数据集?保存时,我需要按数据框中存在的 4 列进行分区。
这是我的存档:
data
.filter(validPartnerIds($"partnerID"))
.write
.partitionBy("partnerID","year","month","day")
.parquet(saveDestination)
问题是如果目标文件夹存在,则保存会引发错误。 如果目标不存在,那么我不会附加我的文件。
我尝试过使用.mode("append"),但我发现 Spark 有时会在中途失败,因此我最终会丢失已写入的数据量以及仍需要写入的数据量。
我正在使用镶木地板,因为分区大大增加了我将来的查询。同样,我必须将数据以某种文件格式写入磁盘,不能使用 Druid 或 Cassandra 等数据库。
非常感谢任何有关如何对我的数据框进行分区和保存文件(坚持镶木地板或其他格式)的建议。
【问题讨论】:
-
你能分享一下你在使用
.mode(append)时遇到的错误吗? -
我得到的错误是这样的: 引起:java.io.IOException:文件已经存在:/tracking/v4/010316/gif=a/partnerID=111/year=2016/month=1 /day=3/part-r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet 我认为这个错误是由于一些写入操作需要很长时间时任务调度不匹配而引发的。跨度>
-
@Saman 你是如何解决这个问题的?我正在尝试在这里处理类似的情况。超过 10000 个分区,实际上甚至更多,因为每小时一个分区。我正在尝试围绕此构建解决方案,有什么建议吗?谢谢。
-
@ds_user,使用最新版本的spark(我使用的是Databricks Runtime Environment,它有很多开源spark之外的特性),你可以在writer中使用“append”模式。在解决此问题之前,您应该重新考虑导致 10k+ 分区的分区列。我不知道您拥有的数据量,但是可以按天而不是按小时进行分区吗?尝试对数据进行重新分区,以确保在写入文件后拥有 200-500MB 的文件。您是要一次写入所有 10k 个分区还是每批都是几个分区?
-
这是我要解决的问题。 stackoverflow.com/questions/50197782/… 。我正在尝试通过列出 id(或)产品 id 进行分区。所以想要存储listingId=612/year=2018/month=3/day=5 如果我为每个listing汇总到每日分区。但是由于列表数量的增加,这仍然是太多的分区。有什么建议吗?
标签: scala apache-spark append parquet