【发布时间】:2020-08-04 16:09:38
【问题描述】:
我必须更新历史数据。通过更新,我的意思是向 S3 上的现有分区添加新行,有时还添加新列。
当前分区按日期实现:created_year={}/created_month={}/created_day={}。为了避免每个分区的对象过多,我执行以下操作来维护单个对象/分区:
def save_repartitioned_dataframe(bucket_name, df):
dest_path = form_path_string(bucket_name, repartitioned_data=True)
print('Trying to save repartitioned data at: {}'.format(dest_path))
df.repartition(1, "created_year", "created_month", "created_day").write.partitionBy(
"created_year", "created_month", "created_day").parquet(dest_path)
print('Data repartitioning complete with at the following location: ')
print(dest_path)
_, count, distinct_count, num_partitions = read_dataframe_from_bucket(bucket_name, repartitioned_data=True)
return count, distinct_count, num_partitions
存在一种情况,我必须添加具有这些列值的某些行:
created_year | created_month | created_day
2019 |10 |27
这意味着该路径上的文件(S3 对象):created_year=2019/created_month=10/created_day=27/some_random_name.parquet 将附加新行。
如果架构发生变化,那么所有对象都必须实现该变化。
我尝试研究它的一般工作原理,因此,有两种感兴趣的模式:覆盖、附加。
第一个只会添加当前数据并删除其余数据。我不想要那种情况。第二个将附加但may end up creating more objects。我也不想要那种情况。我还读到数据帧在 Spark 中是不可变的。
那么,我如何实现将新数据附加到现有分区并每天维护一个对象?
【问题讨论】:
标签: apache-spark amazon-s3 pyspark