【问题标题】:multiprocessing exits before doing any job在做任何工作之前多处理退出
【发布时间】:2020-03-08 08:36:43
【问题描述】:

我继承了一个特定的解析器,它应该解析 10 个文件,每个文件大约 4m 行。

代码是用 Python 2 编写的,我对其进行了更新。

有一个多处理逻辑,我似乎无法开始工作。

from multiprocessing.pool import ThreadPool
import glob

DATADIR = 'home/my_dir/where/all/my/files/are'

def process_file(filepath):
    # read line by line, parse and insert to postgres database. 

def process_directory(dirpath):
    pattern = f'{dirpath}/*dat'  # files have .dat extension. 
    tp = ThreadPool(10)
    for filepath in glob.glob(pattern):
        print(filepath)
        tp.apply_async(process_file, filepath)
    tp.close()
    tp.join()

if __name__ == '__main__':
    process_directory(DATADIR)

我浏览了很多文档和一些类似的问题,但它似乎不起作用。

使用解析器代码,我确实会在控制台上打印出我需要解析的文件的所有路径,但程序仅此而已。

【问题讨论】:

  • 在你的小sn-p中,你什么时候“运行”线程?我看不到那条线,只声明 Pool ,关闭并加入

标签: python python-3.x parallel-processing multiprocessing threadpool


【解决方案1】:

问题在于您调用apply_async 的方式。我为您的问题制作了一个简单的复制器,但稍作调整即可从每次调用中获取结果:

from multiprocessing.pool import ThreadPool

def func(f):
    print("hey " + f)
    return f + "1"

l = ["name", "name2", "name3"]
pool = ThreadPool(3)
out = []
for a in l:
    print(a)
    out.append(pool.apply_async(func, a))

# Check the response from each `apply_async` call
for a in out:
    a.get()

pool.close()
pool.join()

这会返回一个错误:

Traceback (most recent call last):
  File "a.py", line 16, in <module>
    a.get()
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 599, in get
    raise self._value
  File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
TypeError: func() takes 1 positional argument but 4 were given

它认为你传递了四个位置参数,而不是一个。这是因为apply_async 想要在一个元组中传递所有参数,如下所示:

pool.apply_async(func, (a,))

如果你在调用apply_async 时将filepath 放在一个元组中,我想你会得到你期望的行为。

还值得注意的是,您的用例非常适合使用pool.map 而不是apply_async,后者更简洁:

pool.map(process_file, glob.glob(pattern))

【讨论】:

  • 非常感谢!我一直在想,在知道高级开发人员的答案后,我怎么能得到这个答案。对于这种情况,我发现如果我访问了源代码(我没有),我会注意到该函数的定义是这样的 def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): ... - 正如你所说的那样,它需要一个可迭代的 @987654333 @。谢谢!
  • @TytireRecubans 是的,那会奏效的。 official documentation 也有使用 apply_async 的示例,这将揭示正确的调用方式。
猜你喜欢
  • 2015-03-21
  • 2022-11-17
  • 1970-01-01
  • 2011-04-20
  • 1970-01-01
  • 2012-12-11
  • 2011-09-25
相关资源
最近更新 更多