【问题标题】:Efficiently write large pandas data to different files高效地将大熊猫数据写入不同的文件
【发布时间】:2020-05-18 20:12:34
【问题描述】:

我有一个大约 200 万行(每行 80 列)的 pandas 数据框。

我想将数据框输出到 csv 以及 parquet 文件。

假设数据框存在于df 变量中

初步方法

print('Creating csv and parquet files')
st = time.time()
df.to_csv('output_file.csv')
df.to_parquet('output_file.parquet')
print(f'Created csv and parquet files in {time.time() - st} seconds')

使用这种方法写入文件花费的时间太长。 我假设由于这两个是独立的操作,我可以利用多个进程。

新方法

def build_csv(dataframe, output_filename):
    print(f'Building csv: {output_filename}')
    dataframe.to_csv(output_filename)


def build_parquet(dataframe, output_filename):
    print(f'Building parquet: {output_filename}')
    dataframe.to_parquet(output_filename)


with ProcessPoolExecutor(max_workers=3) as executor:
    executor.submit(build_csv, (df, 'output_file.csv'))
    executor.submit(build_parquet, (df, 'output_file.parquet'))

较新的方法运行成功,但我没有看到正在创建任何文件。我不确定为什么会这样。

有没有更好(更快)的方法将 pandas 数据帧写入不同的文件?

【问题讨论】:

  • 猜测:子进程可能有不同的当前工作目录。使用要写入的文件名的绝对路径。
  • 这似乎是使用Dask 的问题。

标签: python pandas python-multiprocessing parquet


【解决方案1】:

编辑:我保留了下面的线程解决方案供您参考。但是,此解决方案应该解决 Python GIL 问题。我测试了一下,可以看到文件已经写入成功了:

from multiprocessing import Pool
import pandas as pd

# original data:
data = pd.DataFrame([
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,], 
    [ 1, 2, 3, 4,],
])    


def SaveDataToCsv(data):
    print('Started export to .csv')
    data.to_csv('data.csv')
    print('Finished export to .csv')


def SaveDataToParquet(data):
    print('Started export to .parquet')
    data.to_parquet('data.parquet')
    print('Finished export to .parquet')


# multiprocessing method:
pool = Pool(processes=2)
process1 = pool.apply_async(SaveDataToCsv, [data])
process2 = pool.apply_async(SaveDataToParquet, [data])

测试了threading 库,它似乎工作正常:

import pandas as pd
import threading

# original data:
data = pd.DataFrame([
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
    [ 1, 2, 3, 4,],
])


def SaveDataToCsv(data):        
    data.to_csv('data.csv')


def SaveDataToParquet(data):
    data.to_parquet('data.parquet')    

thread1 = threading.Thread(target=SaveDataToCsv, args=(data,))
thread2 = threading.Thread(target=SaveDataToParquet, args=(data,))

thread1.start()
thread2.start()

【讨论】:

  • 谢谢。像魅力一样工作,减少了相当多的时间。但是 Python 的 GIL 怎么没有在这里造成问题呢?这里面的 Afaik 线程应该和顺序运行一样好,但需要额外的切换线程开销,对吧?
  • 在做了一些阅读/测试之后,我的回答似乎不应该减少总时间(实际上它可能会增加它)。要回答您关于 Python GIL 的问题,请查看这个有用的链接:realpython.com/python-gil - 否则我会尝试调查您的原始错误,看看我们是否无法让它工作。根据附加的链接,多处理库是 Python GIL 的解决方法。
  • 在一小部分数据(10 万行)上测试了几次: 顺序:Created csv and parquet files in 15.59054684638977 seconds 你的方法(线程):Created csv and parquet files in 0.006857156753540039 seconds 我得到了我的解决方案,但现在我更困惑了事情是如何运作的。 Here 是一个关于此的讨论主题:
  • 您需要在线程完成执行后(thread.isAlive() == False 时)对线程方法的结果进行计时。否则,您只是在测量启动线程所需的时间。更多关于线程:docs.python.org/3/library/threading.html
  • 介绍了这些方法。线程会增加时间。此外,使用Pool 的编辑方法不会创建文件。我在这里做错了什么?
【解决方案2】:

由于您写入同一个磁盘,磁盘是瓶颈,您的多处理不会加快操作。

【讨论】:

    猜你喜欢
    • 2016-12-25
    • 1970-01-01
    • 2017-10-31
    • 2018-11-11
    • 2021-02-14
    • 1970-01-01
    • 2018-07-08
    • 2022-07-20
    • 1970-01-01
    相关资源
    最近更新 更多