【问题标题】:Use GCSFileSystem with MultiProcessing将 GCSFileSystem 与多处理一起使用
【发布时间】:2021-05-22 19:15:41
【问题描述】:

我正在尝试使用 gcsfs.GCSFileSystem 运行一个程序来访问 Google Cloud Storage,所有这些都使用 python 的 concurrent.futures.ProcessPoolExecutor

要运行的代码实际上非常复杂,但我设法将其归结为这个最小的非工作示例:

from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem


def f(path):
    print(f"Creating {path}...")
    print("Created. Getting glob...")
    print(main_fs.glob(path))
    print("Done!")


if __name__ == "__main__":

    main_fs = GCSFileSystem()
    print(main_fs.glob("code_tests_sand"))

    with ProcessPoolExecutor(max_workers=10) as pool:
        l_ = []
        for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
            l_.append(0)

我希望:

['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...
['code_tests_sand']
Done!

我明白了:

['code_tests_sand']
Creating code_tests_sand...
Created. Getting glob...

程序卡在这里没有结束。

我找到了一种通过将 GCSFileSystem 对象明确地提供给函数来获得预期输出的方法:

from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem


def f(path, ff):
    print(f"Creating {path}...")
    print("Created. Getting glob...")
    print(ff.glob(path))
    print("Done!")


if __name__ == "__main__":

    main_fs = GCSFileSystem()
    print(main_fs.glob("code_tests_sand"))

    with ProcessPoolExecutor(max_workers=10) as pool:
        l_ = []
        for sub_rules_list in (pool.map(f, ["code_tests_sand"], [main_fs])):
            l_.append(0)

但是,这对我来说不是一个好的解决方案,因为我无法在我的真实代码中做到这一点。关于为什么会发生这种情况以及如何解决它的任何想法?

仅供参考,我在 Ubuntu 18、Python 3.8 上运行,这是我的 pip freeze 输出:

aiohttp==3.7.3
async-timeout==3.0.1
attrs==20.3.0
cachetools==4.2.1
certifi==2020.12.5
chardet==3.0.4
decorator==4.4.2
fsspec==0.8.5
gcsfs==0.7.2
google-auth==1.27.0
google-auth-oauthlib==0.4.2
idna==2.10
multidict==5.1.0
oauthlib==3.1.0
pyasn1==0.4.8
pyasn1-modules==0.2.8
requests==2.25.1
requests-oauthlib==1.3.0
rsa==4.7.1
six==1.15.0
typing-extensions==3.7.4.3
urllib3==1.26.3
yarl==1.6.3

【问题讨论】:

标签: python-3.x google-cloud-platform google-cloud-storage


【解决方案1】:

我最终找到了解决方法:以包裹 GCSFileSystem 的类的形式:

from concurrent.futures import ProcessPoolExecutor
from gcsfs import GCSFileSystem
from copy import copy
import sys


class Dummy:
    fs = None

    @classmethod
    def set_fs(cls, fs):
        cls.fs = fs

    def __init__(self, path):
        self.fs = copy(Dummy.fs)
        self.path = path

    def glob(self):
        return self.fs.glob(self.path)


def f(path):
    print(f"Creating {path}...")
    p = Dummy(path)
    print("Created. Getting glob...")
    print(p.glob())
    print(sys.getsizeof(p.fs))
    print("Done!")


if __name__ == "__main__":

    main_fs = GCSFileSystem()
    print(main_fs.glob("code_tests_sand"))

    Dummy.set_fs(main_fs)

    with ProcessPoolExecutor(max_workers=10) as pool:
        l_ = []
        for sub_rules_list in (pool.map(f, ["code_tests_sand"])):
            l_.append(0)

请注意,在每个类实例化中复制 GCSFileSystem 对象是必需。如果不是,该类只要没有被多重处理就可以正常工作,但是当它被处理时会显示相同的有问题的行为。这里的 GCSFileSystem 仅占 50 字节左右,因此复制它不会对内存造成太大影响。

【讨论】:

    猜你喜欢
    • 2012-03-04
    • 1970-01-01
    • 2018-03-10
    • 2023-02-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-16
    • 2022-01-18
    相关资源
    最近更新 更多