【问题标题】:How would I go about converting a .csv to an .arrow file without loading it all into memory?我将如何将 .csv 转换为 .arrow 文件而不将其全部加载到内存中?
【发布时间】:2023-03-21 13:48:01
【问题描述】:

我在这里发现了一个类似的问题:Read CSV with PyArrow

在这个答案中,它引用了 sys.stdin.buffer 和 sys.stdout.buffer,但我不确定如何使用它来编写 .arrow 文件或命名它。 我似乎无法在 pyarrow 的文档中找到我正在寻找的确切信息。我的文件不会有任何 nans,但它会有一个带时间戳的索引。该文件约为 100 GB,因此根本无法将其加载到内存中。我尝试更改代码,但正如我假设的那样,代码最终会在每个循环中覆盖之前的文件。

***这是我的第一篇文章。我要感谢所有贡献者,他们在我提出问题之前就回答了我 99.9% 的其他问题。

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1     ### used one line chunks for a small test

def main():
    writer = None
    for split in pd.read_csv(sys.stdin.buffer, chunksize=SPLIT_ROWS):

        table = pa.Table.from_pandas(split)
        # Write out to file
        with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
            with pa.RecordBatchFileWriter(sink, table.schema) as writer:
                writer.write_table(table)
    writer.close()

if __name__ == "__main__":
    main()

下面是我在命令行中使用的代码

>cat data.csv | python test.py

【问题讨论】:

  • 你能看看stackoverflow.com/questions/68555085/…,看看有没有帮助?它展示了如何使用 parquet 文件执行此操作。使用 .arrow 文件执行此操作应该是几乎相同的过程。打开你的作家一次,然后你可以多次调用 write_table (每个块一次),然后你可以在最后关闭它。
  • 你的例子也很接近。我认为您想要更改的是在 for 循环之外创建 RecordBatchFileWriter 。然后,您可以在同一实例上多次调用write_table。然后你可以在 for 循环之后关闭它,就像你已经在做的那样。

标签: python pandas csv pyarrow apache-arrow


【解决方案1】:

正如@Pace 所建议的,您应该考虑将输出文件的创建移到读取循环之外。像这样的:

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1     ### used one line chunks for a small test

def main():
    # Write out to file
    with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
        with pa.RecordBatchFileWriter(sink, table.schema) as writer:
            for split in pd.read_csv('data.csv', chunksize=SPLIT_ROWS):
                table = pa.Table.from_pandas(split)
                writer.write_table(table)

if __name__ == "__main__":
    main()        

如果您希望指定特定的输入和输出文件,也不必使用sys.stdin.buffer。然后您可以将脚本运行为:

python test.py

通过使用with 语句,writersink 将在之后自动关闭(在这种情况下当main() 返回时)。这意味着不必包含显式的close() 调用。

【讨论】:

  • 我选择了这里显示的解决方案。在编写器写入文件之前需要定义模式,所以我阅读了前两行并在循环之前定义了它。它确实假设了一致的数据类型,但就我的目的而言,这样就可以了。
【解决方案2】:

改编自@Martin-Evans 代码的解决方案:

@Pace 建议的 for 循环后关闭文件

import sys

import pandas as pd
import pyarrow as pa

SPLIT_ROWS = 1000000

def main():
    schema = pa.Table.from_pandas(pd.read_csv('Data.csv',nrows=2)).schema 
    ### reads first two lines to define schema 

    with pa.OSFile('test.arrow', 'wb') as sink:
        with pa.RecordBatchFileWriter(sink, schema) as writer:            
            for split in pd.read_csv('Data.csv',chunksize=SPLIT_ROWS):
                table = pa.Table.from_pandas(split)
                writer.write_table(table)

            writer.close()

if __name__ == "__main__":
    main()   

【讨论】:

  • 注意:这里使用with语句是为了确保sinkwriter之后自动关闭,不需要显式的close()
猜你喜欢
  • 1970-01-01
  • 2015-09-11
  • 2013-01-08
  • 2015-05-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-05-20
  • 2018-04-03
相关资源
最近更新 更多