【问题标题】:How to add rows to an existing partition in Spark?如何将行添加到 Spark 中的现有分区?
【发布时间】:2020-08-04 16:09:38
【问题描述】:

我必须更新历史数据。通过更新,我的意思是向 S3 上的现有分区添加新行,有时还添加新列。

当前分区按日期实现:created_year={}/created_month={}/created_day={}。为了避免每个分区的对象过多,我执行以下操作来维护单个对象/分区:

def save_repartitioned_dataframe(bucket_name, df):
    dest_path = form_path_string(bucket_name, repartitioned_data=True)
    print('Trying to save repartitioned data at: {}'.format(dest_path))
    df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
        "created_year", "created_month", "created_day").parquet(dest_path)
    print('Data repartitioning complete with at the following location: ')
    print(dest_path)
    _, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
    return count, distinct_count, num_partitions

存在一种情况,我必须添加具有这些列值的某些行:

created_year | created_month | created_day
2019         |10             |27   

这意味着该路径上的文件(S3 对象):created_year=2019/created_month=10/created_day=27/some_random_name.parquet 将附加新行。

如果架构发生变化,那么所有对象都必须实现该变化。

我尝试研究它的一般工作原理,因此,有两种感兴趣的模式:覆盖、附加。

第一个只会添加当前数据并删除其余数据。我不想要那种情况。第二个将附加但may end up creating more objects。我也不想要那种情况。我还读到数据帧在 Spark 中是不可变的。

那么,我如何实现将新数据附加到现有分区并每天维护一个对象?

【问题讨论】:

    标签: apache-spark amazon-s3 pyspark


    【解决方案1】:

    根据您的问题,我了解您需要在现有数据中添加新行,同时不增加镶木地板文件的数量。这可以通过对特定分区文件夹进行操作来实现。执行此操作时可能会出现三种情况。

    1) 新分区

    这意味着传入数据在分区列中有一个新值。在你的情况下,这可能是:

    现有数据

    | year | month | day |
    | ---- | ----- | --- |
    | 2020 |   1   |  1  |
    

    新数据

    | year | month | day |
    | ---- | ----- | --- |
    | 2020 |   1   |  2  |
    

    因此,在这种情况下,您只需为传入的数据创建一个新的分区文件夹并照常保存即可。

    partition_path = "/path/to/data/year=2020/month=1/day=2"
    new_data.repartition(1, "year", "month", "day").write.parquet(partition_path)
    

    2) 现有分区,新数据

    这是您想要将新行追加到现有数据的位置。可能是这样的:

    现有数据

    | year | month | day | key | value |
    | ---- | ----- | --- | --- | ----- |
    | 2020 |   1   |  1  |  a  |   1   |
    

    新数据

    | year | month | day | key | value |
    | ---- | ----- | --- | --- | ----- |
    | 2020 |   1   |  1  |  b  |   1   |
    

    这里我们有一个相同分区的新记录。您可以使用“附加模式”,但您希望每个分区文件夹中有一个镶木地板文件。这就是为什么您应该先读取现有分区,将其与新数据合并,然后再将其写回。

    partition_path = "/path/to/data/year=2020/month=1/day=1"
    old_data = spark.read.parquet(partition_path)
    write_data = old_data.unionByName(new_data)
    write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)
    

    3) 现有分区,现有数据

    如果传入的数据是UPDATE,而不是INSERT,该怎么办?在这种情况下,您应该更新一行而不是插入一个新行。想象一下:

    现有数据

    | year | month | day | key | value |
    | ---- | ----- | --- | --- | ----- |
    | 2020 |   1   |  1  |  a  |   1   |
    

    新数据

    | year | month | day | key | value |
    | ---- | ----- | --- | --- | ----- |
    | 2020 |   1   |  1  |  a  |   2   |
    

    "a" 之前的值为 1,现在我们希望它为 2。因此,在这种情况下,您应该读取现有数据并更新现有记录。这可以通过以下方式实现。

    partition_path = "/path/to/data/year=2020/month=1/day=1"
    old_data = spark.read.parquet(partition_path)
    write_data = old_data.join(new_data, ["year", "month", "day", "key"], "outer")
    write_data = write_data.select(
        "year", "month", "day", "key",
        F.coalesce(new_data["value"], old_data["value"]).alias("value")
    )
    write_data.repartition(1, "year", "month", "day").write.parquet(partition_path)
    

    当我们将旧数据与新数据外连接时,可能有四件事,

    • 两个数据具有相同的值,不管取哪一个
    • 两个数据有不同的值,取新的值
    • 旧数据没有价值,新数据有,取新
    • 新数据没有价值,旧数据有,拿旧的

    为了实现我们在这里的愿望,来自pyspark.sql.functionscoalesce 将完成这项工作。

    请注意,此解决方案也适用于第二种情况。

    关于架构更改

    Spark 支持 parquet 文件格式的模式合并。这意味着您可以在数据中添加或删除列。当您添加或删除列时,您将意识到在从顶层读取数据时某些列不存在。这是因为 Spark 默认禁用模式合并。来自documentation

    与 Protocol Buffer、Avro 和 Thrift 一样,Parquet 也支持模式演化。用户可以从一个简单的模式开始,然后根据需要逐渐向模式中添加更多列。这样,用户最终可能会得到多个 Parquet 文件,这些文件具有不同但相互兼容的模式。 Parquet 数据源现在能够自动检测这种情况并合并所有这些文件的模式。

    为了能够读取所有列,您需要将mergeSchema 选项设置为true

    df = spark.read.option("mergeSchema", "true").parquet(path)
    

    【讨论】:

      猜你喜欢
      • 2019-09-06
      • 2014-10-19
      • 2021-04-04
      • 1970-01-01
      • 2017-03-27
      • 1970-01-01
      • 2017-08-07
      • 1970-01-01
      • 2019-04-16
      相关资源
      最近更新 更多