【问题标题】:Save Spark dataframe to HDFS partitioned by date将 Spark 数据帧保存到按日期分区的 HDFS
【发布时间】:2019-06-03 12:04:12
【问题描述】:

我需要将 Spark 数据帧中的数据以 Avro 格式写入 HDFS。挑战在于数据应该每天保存,因此目录如下所示:tablename/2019-08-12、tablename/2019-08-13 等等。 我只有一个时间戳字段,我需要从中提取创建目录名称的日期。 我已经建立了一个有两个问题的方法: 1) 从时间戳中提取日期存在困难 3)在大型数据集(以后会更大)上,由于启动了很多任务,性能将非常糟糕。 那么我该如何改变/改进这种方法呢?

这是我使用的代码(dataDF是输入数据):

val uniqueDates = dataDF.select("update_database_time").distinct.
collect.map(elem => elem.getTimestamp(0).getDate)

    uniqueDates.map(date => {
      val resultDF = dataDF.where(to_date(dataDF.col("update_database_time")) <=> date)
      val pathToSave = s"${dataDir}/${tableNameValue}/${date}"
      dataDF.write
            .format("avro")
            .option("avroSchema", SchemaRegistry.getSchema(
                   schemaRegistryConfig.url,
                   schemaRegistryConfig.dataSchemaSubject,
                   schemaRegistryConfig.dataSchemaVersion))
            .save(s"${hdfsURL}${pathToSave}")
      resultDF
    })
      .reduce(_.union(_))

【问题讨论】:

  • 您可以创建一个新的 Hive 表,该表将存储为 Avro 并按日期分区。然后,您可以将数据写入 Hive 并直接从文件中读取(文件将以您描述的方式存储)。

标签: scala apache-spark hdfs


【解决方案1】:

如果你能忍受这样的目录结构

tablename/date=2019-08-12
tablename/date=2019-08-13

相反,DataFrameWriter.partitionBy 可以解决问题。例如

val df =
  Seq((Timestamp.valueOf("2019-06-01 12:00:00"), 1),
      (Timestamp.valueOf("2019-06-01 12:00:01"), 2),
      (Timestamp.valueOf("2019-06-02 12:00:00"), 3)).toDF("time", "foo")

df.withColumn("date", to_date($"time"))
  .write
  .partitionBy("date")
  .format("avro")
  .save("/tmp/foo")

产生以下结构

find /tmp/foo
/tmp/foo
/tmp/foo/._SUCCESS.crc
/tmp/foo/date=2019-06-01
/tmp/foo/date=2019-06-01/.part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00000-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-01/.part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc
/tmp/foo/date=2019-06-01/part-00001-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/_SUCCESS
/tmp/foo/date=2019-06-02
/tmp/foo/date=2019-06-02/part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro
/tmp/foo/date=2019-06-02/.part-00002-2a7a63f2-7038-4aec-8f76-87077f91a415.c000.avro.crc

【讨论】:

  • 感谢您的建议!我也尝试用 partitionBy 解决这个问题,但是由于我没有用 column 子句定义,因此 update_database_time 列在最终结果中消失了。唯一的问题是如何重命名最终目录,以便有一个没有 partitionColumn 名称的日期?预期结果:/tmp/foo/2019-06-02
  • 我知道的唯一方法是覆盖默认的文件系统实现,所以我会考虑尽可能适应在文件名中包含date=
猜你喜欢
  • 2018-06-22
  • 2020-08-18
  • 1970-01-01
  • 2015-09-29
  • 2015-03-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多