【问题标题】:Reading partition columns without partition column names读取没有分区列名的分区列
【发布时间】:2021-04-28 08:45:00
【问题描述】:

我们将存储在 s3 中的数据按以下结构分区:

bucket/directory/table/aaaa/bb/cc/dd/

其中aaaa 是年份,bb 是月份,cc 是日期,dd 是小时。

如您所见,路径中没有分区键(year=aaaamonth=bbday=cchour=dd)

结果,当我将表格读入 Spark 时,没有 yearmonthdayhour 列。

无论如何我可以将表读入 Spark 并包含分区列 without

  • 更改 s3 中的路径名
  • 在循环中迭代每个分区值并将每个分区一个一个地读取到 Spark 中(这是一个巨大的表,这需要很长时间并且显然不是最佳的)。

【问题讨论】:

    标签: apache-spark amazon-s3 pyspark parquet partition


    【解决方案1】:

    Spark 不能 discover partitions 在路径中未编码为 partition_name=value,因此您必须创建它们。

    在将路径 bucket/directory/table/aaaa/bb/cc/dd/ 加载到 DataFrame 中后,您可以从使用 input_file_name() 获得的源文件名中提取这些分区。

    首先,使用 / 分隔符分割文件名路径,然后从最后 4 个元素创建列:

    from pyspark.sql import functions as F
    
    df1 = df.withColumn("date_partitions", F.slice(F.split(F.input_file_name(), "/"), -5, 4)) \
        .withColumn("year", F.col("date_partitions").getItem(0)) \
        .withColumn("month", F.col("date_partitions").getItem(1)) \
        .withColumn("day", F.col("date_partitions").getItem(2)) \
        .withColumn("hour", F.col("date_partitions").getItem(3)) \
        .drop("data_partitions")
    

    例子:

    data = [
        (1, 2, "bucket/directory/table/2021/01/10/14/"),
        (3, 4, "bucket/directory/table/2021/01/11/18/")
    ]
    
    df = spark.createDataFrame(data, ["a", "b", "input_file_name"])
    

    给予:

    #+---+---+-------------------------------------+----+-----+---+----+
    #|a  |b  |input_file_name                      |year|month|day|hour|
    #+---+---+-------------------------------------+----+-----+---+----+
    #|1  |2  |bucket/directory/table/2021/01/10/14/|2021|01   |10 |14  |
    #|3  |4  |bucket/directory/table/2021/01/11/18/|2021|01   |11 |18  |
    #+---+---+-------------------------------------+----+-----+---+----+
    

    【讨论】:

    • 所以我将整个表(所有分区)读入一个df,然后创建这些列?
    • 此外,我如何从这些值创建单个时间戳列,而不是创建单独的年、月、日、小时列?
    • 我试过这个:.withColumn("timestamp",datetime.datetime(year=int(F.col("data_partitions").getItem(0)), month=int(F.col("data_partitions").getItem(1)), day=int(F.col("data_partitions").getItem(2)), hour=int(F.col("data_partitions").getItem(3))))
    • @KOB 使用:.withColumn("date", F.to_timestamp(F.array_join("date_partitions", "-"), "yyyy-MM-dd-HH")) 从分区部分创建日期列。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2022-11-13
    • 1970-01-01
    • 2016-04-03
    • 2022-01-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多