【问题标题】:spark creates partition inside partition on S3spark 在 S3 上的分区内创建分区
【发布时间】:2020-09-29 10:24:44
【问题描述】:

我有以下制表符分隔的示例数据集:

col1  period  col3  col4  col5  col6  col7  col8  col9  col10 col11 col12 col13 col14 col15 col16 col17 col18 col19 col20 col21 col22
ASSDF 202001  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202002  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202003  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
ASSDF 202004  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99
...
...
ASSDF 202312  A B BFGF  SSDAA WDSF  SDSDSD  SDSDSSS SDSDSD  E F FS  E CURR1 CURR2 -99 CURR3 -99 -99 -99 -99

我正在对这些数据进行一些转换,最终数据在 spark 数据集 "DS1" 中。之后,我将该数据集写入带有“周期”分区的 s3。由于我也需要 s3 文件中的句点,因此我正在从句点列创建另一列“datasetPeriod”。

我的 scala 函数来保存 TSV 数据集。

def saveTsvDataset(dataframe: DataFrame, outputFullPath: String, numPartitions: Integer, partitionCols: String*): Unit = {
    dataframe
      .repartition(numPartitions)
      .write
      .partitionBy(partitionCols:_*)
      .mode(SaveMode.Overwrite)
      .option("sep", "\t")
      .csv(outputFullPath)
  }

将数据集保存在 s3 上的 Scala 代码。为 s3 上的分区添加新列 datasetPeriod。

 saveTsvDataset(
      DS1.withColumn("datasetPeriod",$"period")
      , "s3://s3_path"
      , 100
      , "period"
    )

现在,我的问题是我的时间段从 202001 到 202312,当我在 s3 上使用“datasetPeriod”上的分区写入时,有时它会在分区内为任何随机时间段创建分区。所以这在任何时期都是随机发生的。我从来没有看到这种情况发生了多个时期。它创建像"s3://s3_path/datasetPeriod=202008/datasetPeriod=202008" 这样的路径。

【问题讨论】:

    标签: scala apache-spark amazon-s3 apache-spark-sql apache-spark-dataset


    【解决方案1】:

    您的 DataFrame 中已经有一个 period 列。因此无需再创建一个新的重复 datasetPeriod 列。

    当您使用 .partitionBy("period") 将 DataFrame 写入 s3://../parentFolder 时,它会创建如下文件夹:

    df.write.partitionBy("period").csv("s3://../parentFolder/")
    s3://.../parentFolder/period=202001/
    s3://.../parentFolder/period=202002/
    s3://.../parentFolder/period=202003/
    ...
    s3://.../parentFolder/period=202312/
    

    在读回数据时,只需提及直到parentFolder 的路径,它将自动读取period 作为列之一。

    val df = spark.read.csv("s3://../parentFolder/")
    //df.schema will give you `period` as one of the column
    df.printSchema
    root
     |-- col1: string (nullable = true)
     |-- .... //other columns go here
     |-- period: string (nullable = true)
    

    话虽如此,无论您在分区列中获得多个分区,都只是由于您在使用 partitionBy 写入数据时使用了错误的路径。

    【讨论】:

      猜你喜欢
      • 2019-08-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-10-05
      • 1970-01-01
      • 1970-01-01
      • 2018-03-22
      • 1970-01-01
      相关资源
      最近更新 更多