【问题标题】:Write large dask dataframe into a single S3 CSV file将大型 dask 数据帧写入单个 S3 CSV 文件
【发布时间】:2018-03-01 21:11:42
【问题描述】:

我正在使用一个 dask.distributed 集群,我想将一个大数据帧保存到一个 CSV 文件到 S3,如果可能的话保持分区的顺序(默认情况下 to_csv() 将数据帧写入多个文件,每个分区一个)。此外,这个文件存储操作也应该作为惰性/延迟任务执行(它应该在集群工作人员上执行)。

我能想到的第一个解决方案是将分区存储在临时 S3 位置,然后在延迟函数中使用 S3 分段上传将这些文件合并/上传在一起。临时存储在这里是一个缺点。

可以使用 dataframe.to_delayed() 来完成吗?上传需要由单个延迟函数执行(S3没有追加操作),同时dataframe可能大于worker的内存,所以该函数不能简单地依赖所有dataframe分区。

附言。 CSV 格式是此用例的要求。

【问题讨论】:

    标签: dask dask-distributed


    【解决方案1】:

    您当然可以使用df.map_partitions 写入多个文件,然后使用s3fsmerge 方法创建一个最终文件(请记住,除了第一部分之外的所有文件都不应写入标题行)。

    您也可以按照您的建议使用to_delayed 做类似的事情。您需要 create 多部分上传,将其与每个延迟对象一起传递给您的上传者,并在延迟的 finalize 步骤中收集这些片段 - 链接指向 s3fs 中执行类似操作的代码。

    【讨论】:

    • 好的,我有类似的想法,但我不确定“传递”部分,我必须在单独的延迟函数调用中上传每个分区,每个分区都可能是可能在不同的工作人员上执行。我将检查该代码并尝试这些方面的内容。
    • 通过“pass long”,我的意思是您的上传功能将类似于f(mpu, data),其中data 是延迟部分之一(每次调用都不同),mpu 是句柄您以与 s3fs 中相同的方式创建(所有调用都相同)。
    • 跟进,对,这也是我的想法,我最终使用了一系列延迟函数来传递分区,直到它们合并成足够大的片段以添加到分段上传。事实证明,保存文件并在之后合并它们并不简单,因为 s3fs API 也需要 min 块。 5MB,所以一般情况下需要某种分区聚合。
    猜你喜欢
    • 2019-01-16
    • 2023-03-25
    • 2018-06-24
    • 1970-01-01
    • 1970-01-01
    • 2020-08-21
    • 2018-03-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多