【问题标题】:Spark s3 csv files read orderSpark s3 csv文件读取顺序
【发布时间】:2022-01-13 23:40:30
【问题描述】:

假设 s3 文件夹中的三个文件,通过 spark.read.csv(s3:bucketname/folder1/*.csv) 读取是否按顺序读取文件? 如果没有,有没有办法在读取整个文件夹时对文件进行排序,同时在内部不同时间收到多个文件。

File name s3 file uploaded/Last modified time
s3:bucketname/folder1/file1.csv 01:00:00
s3:bucketname/folder1/file2.csv 01:10:00
s3:bucketname/folder1/file3.csv 01:20:00

【问题讨论】:

  • 接收时间如何指定?它是您数据中的一列,还是类似于文件中的时间戳?
  • 这是s3文件上传时间/最后修改时间。

标签: apache-spark amazon-s3 pyspark apache-spark-sql


【解决方案1】:

您可以使用以下方法实现此目的

  1. 遍历存储桶中的所有文件并通过添加新列 last_modified 加载该 csv。保留将在dfs_list 中加载的所有 dfs 的列表。由于 pyspark 进行惰性评估,它不会立即加载数据。
import boto3

s3 = boto3.resource('s3')
my_bucket = s3.Bucket('bucketname')

dfs_list = []

for file_object in my_bucket.objects.filter(Prefix="folder1/"):
    df = spark.read.parquet('s3a://' + file_object.name).withColumn("modified_date", file_object.last_modified)
    dfs_list.append(df)
  1. 现在使用 pyspark unionAll 函数对所有 dfs 进行联合,然后根据 modified_date 对数据进行排序。
from functools import reduce
from pyspark.sql import DataFrame

df_combined = reduce(DataFrame.unionAll, dfs_list)

df_combined = df_combined.orderBy('modified_date')

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-12-04
    • 2017-06-21
    • 1970-01-01
    • 2021-03-11
    • 1970-01-01
    • 1970-01-01
    • 2021-10-11
    • 2019-02-08
    相关资源
    最近更新 更多