【问题标题】:Can't map a function to tarfile members in parallel无法将函数并行映射到 tarfile 成员
【发布时间】:2021-07-06 21:27:06
【问题描述】:

我有一个包含 bz2 压缩文件的 tarfile。我想将函数 clean_file 应用于每个 bz2 文件,并整理结果。在系列中,这很容易通过循环:

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import Pool

def clean_file(member):
    if '.bz2' in str(member):

        f = tr.extractfile(member)

        with bz2.open(f, "rt") as bzinput:
            dicts = []
            for i, line in enumerate(bzinput):
                line = line.replace('"name"}', '"name":" "}')
                dat = json.loads(line)
                dicts.append(dat)

        bzinput.close()
        f.close()
        del f, bzinput

        processed = dicts[0]
        return processed

    else:
        pass


# Open tar file and get contents (members)
tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)


# Apply the clean_file function in series
i=0
processed_files = []
for m in members:
    processed_files.append(clean_file(m))
    i+=1
    print('done '+str(i)+'/'+str(num_files))
    

但是,我需要能够并行执行此操作。我正在尝试的方法使用Pool,如下所示:

# Apply the clean_file function in parallel
if __name__ == '__main__':
   with Pool(2) as p:
      processed_files = list(p.map(clean_file, members))

但这会返回一个 OSError:

Traceback (most recent call last):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "parse_data.py", line 19, in clean_file
    for i, line in enumerate(bzinput):
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/bz2.py", line 195, in read1
    return self._buffer.read1(size)
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 68, in readinto
    data = self.read(len(byte_view))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/_compression.py", line 103, in read
    data = self._decompressor.decompress(rawblock, size)
OSError: Invalid data stream
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "parse_data.py", line 53, in <module>
    processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/site-packages/tqdm/std.py", line 1167, in __iter__
    for obj in iterable:
  File "/Users/johnfoley/opt/anaconda3/envs/racing_env/lib/python3.6/multiprocessing/pool.py", line 735, in next
    raise value
OSError: Invalid data stream

所以我猜这种方式不能从 data.tar 或其他东西中正确访问文件。如何并行应用该功能?

我猜这将适用于任何包含 bz2 文件的 tar 存档,但这是我重现错误的数据: https://github.com/johnf1004/reproduce_tar_error

【问题讨论】:

  • 请将整个回溯添加到您的问题中(不仅仅是OSError)。在这里提问时,您应该提供一个minimal reproducible example,这意味着去掉无关的东西,比如在示例代码中使用tqdm

标签: python parallel-processing python-multiprocessing tar tarfile


【解决方案1】:

似乎发生了一些竞争情况。 在每个子进程中单独打开 tar 文件即可解决问题:

import json
import bz2
import tarfile
import logging
from multiprocessing import Pool


def clean_file(member):
    if '.bz2' not in str(member):
        return
    try:
        with tarfile.open('data.tar') as tr:
            with tr.extractfile(member) as bz2_file:
                with bz2.open(bz2_file, "rt") as bzinput:
                    dicts = []
                    for i, line in enumerate(bzinput):
                        line = line.replace('"name"}', '"name":" "}')
                        dat = json.loads(line)
                        dicts.append(dat)
                        return dicts[0]
    except Exception:
        logging.exception(f"Error while processing {member}")


def process_serial():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    processed_files = []
    for i, member in enumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


def process_parallel():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


def main():
    process_parallel()


if __name__ == '__main__':
    main()

编辑:

请注意,解决此问题的另一种方法是仅使用 spawn start 方法:

multiprocessing.set_start_method('spawn')

通过这样做,我们指示 Python 在子进程中“深度复制”文件句柄。 在默认的“fork”启动方式下,父子文件句柄share the same offsets

【讨论】:

    【解决方案2】:

    您没有指定您正在运行的平台,但我怀疑它是 Windows,因为您有 ...

    if __name__ == '__main__':
        main()
    

    ... 在使用 OS 函数 spawn 创建新进程的平台上创建进程的代码是必需的。但这也意味着当创建一个新进程时(例如,您正在创建的进程池中的所有进程),每个进程都会从程序的最顶层重新执行源程序开始。这意味着每个池进程正在执行以下代码:

    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    num_files = len(members)
    

    但是,我不明白为什么这本身会导致错误,但我不能确定。但是,问题可能是,这是在调用您的工作函数之后执行的,clean_file 正在被调用,因此 tr 尚未设置。如果此代码在clean_file 之前,它可能会起作用,但这只是一个猜测。当然,在每个池进程中提取具有members = tr.getmembers() 的成员是一种浪费。 每个进程都需要打开 tar 文件,最好只打开一次。

    但很明显,您发布的堆栈跟踪与您的代码不匹配。你显示:

    Traceback (most recent call last):
      File "parse_data.py", line 53, in <module>
        processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))
    

    但您的代码没有任何对tqdm 的引用或使用方法imap。现在,当您发布的代码与产生异常的代码不完全匹配时,分析您的实际问题变得更加困难。

    如果您在 Mac 上运行,可能会使用 fork 创建新进程,这可能会在主进程创建多个线程时出现问题(您不一定会看到,也许是通过tarfile 模块)然后你创建一个新进程,我已经指定了代码来确保 spawn 用于创建新进程。无论如何,以下代码应该工作。它还引入了一些优化。如果没有,请发布一个新的堆栈跟踪。

    import pandas as pd
    import json
    import os
    import bz2
    import itertools
    import datetime
    import tarfile
    from multiprocessing import get_context
    
    def open_tar():
        # open once for each process in the pool
        global tr
        tr = tarfile.open('data.tar')
    
    def clean_file(member):
        f = tr.extractfile(member)
    
        with bz2.open(f, "rt") as bzinput:
            for line in bzinput:
                line = line.replace('"name"}', '"name":" "}')
                dat = json.loads(line)
                # since you are returning just the first occurrence:
                return dat
    
    def main():
        with tarfile.open('data.tar') as tr:
            members = tr.getmembers()
        # just pick members where '.bz2' is in member:
        filtered_members = filter(lambda member: '.bz2' in str(member), members)
        ctx = get_context('spawn')
        # open tar file just once for each process in the pool:
        with ctx.Pool(initializer=open_tar) as pool:
            processed_files = pool.map(clean_file, filtered_members)
            print(processed_files)
    
    # required for when processes are created using spawn:
    if __name__ == '__main__':
        main()
    

    【讨论】:

      猜你喜欢
      • 2011-01-25
      • 2016-08-08
      • 1970-01-01
      • 2021-07-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-06-17
      • 1970-01-01
      相关资源
      最近更新 更多