【问题标题】:Can't pass file handle and lock to a process with multiprocessing.Pool?无法通过 multiprocessing.Pool 将文件句柄和锁定传递给进程?
【发布时间】:2021-10-06 04:19:57
【问题描述】:

我正在使用multiprocessing.Pool() 启动一堆进程,其中每个进程都写入同一个文件(使用锁)。

每个进程都被分配了一个“任务”,它只是一个参数元组。

其中一个参数是文件句柄,另一个参数是锁。

但是 Python 不喜欢我既不传递文件句柄也不传递锁。

(当简单地调用multiprocessing.Process时,我可以做到使用multiprocessing.Pool。)

示例。

import multiprocessing as mp
import time
import random

def thr_work00(args):
  arg0 = args[0]
  arg1 = args[1]
  arg2 = args[2]
  arg3 = args[3]
  arg4 = args[4]
  s = random.random()/10
  time.sleep(s)
  print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
  return args

o_file = open('test.txt','w')
o_lock = mp.Lock()

tasks = [
  [0, 0,1, o_file,o_lock],
  [1, 2,3, o_file,o_lock],
  [2, 4,5, o_file,o_lock],
  [3, 6,7, o_file,o_lock],
]

with mp.Pool(2) as pool:
  results = pool.map(thr_work00, tasks)
  for res in results:
    print(res)

传递文件时我得到:TypeError: cannot serialize '_io.TextIOWrapper' object

通过锁时我得到:RuntimeError: Lock objects should only be shared between processes through inheritance

我怎样才能解决这个问题?


编辑。

所以我想知道这是否可以(它似乎有效)。我唯一关心的是每个write 本身都是原子的,但写入完成的顺序并不重要。

import multiprocessing as mp
import time
import random
import os

# ----------------------------------------------------------------
def thr_work00(args):
  arg0 = args[0]
  arg1 = args[1]
  s = random.random()/10
  time.sleep(s)
  txt = 1004*str(arg0)
  with open('test.txt','a') as o_file:
    o_file.write(f'{txt}\n')
  print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
  return args

# ----------------------------------------------------------------
os.remove('test.txt')

tasks = [
  [0, 0xf0],
  [1, 0xf1],
  [2, 0xf2],
  [3, 0xf3],
  [4, 0xf4],
  [5, 0xf5],
  [6, 0xf6],
  [7, 0xf7],
]

with mp.Pool(2) as pool:
  results = pool.map(thr_work00, tasks)
  for res in results:
    print(res)

【问题讨论】:

    标签: python file multiprocessing threadpool python-multiprocessing


    【解决方案1】:

    对于锁和打开文件描述符,您应该通过进程继承来共享它们,而不是尝试将它们作为参数传递。子进程从其父进程继承所有打开的文件描述符,因此您可以这样编写代码:

    import multiprocessing as mp
    import time
    import random
    
    
    def thr_work00(args):
        global o_lock, o_file
    
        s = random.randint(0, 5)
        with o_lock:
            time.sleep(s)
            print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
            o_file.write(f"{args[0]} {s}\n")
            o_file.flush()
        return args
    
    
    with open("test.txt", "w") as o_file:
        o_lock = mp.Lock()
    
        tasks = [
            [0, 0, 1],
            [1, 2, 3],
            [2, 4, 5],
            [3, 6, 7],
        ]
    
        with mp.Pool(2) as pool:
            results = pool.map(thr_work00, tasks)
            for res in results:
                print(res)
    

    或者,而不是写入您的工作人员中的文件,只需 收集结果时在主线程中执行写入。这 无需锁,因为您不再需要担心 关于多个进程写入同一个文件...

    ...或者,如果您需要“实时”写入,而不是最后,请使用 Queue 将它们传送到专用的写入线程。


    这是一个使用队列将结果传递给专用的示例 作者:

    import multiprocessing as mp
    import time
    import random
    
    resultq = mp.Queue()
    
    
    def thr_work00(args):
        global resultq
        s = random.randint(0, 5)
        print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
        time.sleep(s)
        resultq.put((args[0], s))
        return args
    
    
    def thr_writer():
        global resultq
        print('writer start')
        with open('test.txt', 'w') as fd:
            while True:
                item = resultq.get()
                if item is None:
                    break
                fd.write(f'{item[0]}: {item[1]}\n')
        print('writer exit')
    
    
    with open("test.txt", "w") as o_file:
        o_lock = mp.Lock()
    
        writer = mp.Process(target=thr_writer)
        writer.start()
    
        tasks = [
            [0, 0, 1],
            [1, 2, 3],
            [2, 4, 5],
            [3, 6, 7],
        ]
    
        with mp.Pool(2) as pool:
            results = pool.map(thr_work00, tasks)
            for res in results:
                print(res)
    
        resultq.put(None)
        writer.join()
    

    【讨论】:

    • 是的,我确实需要“实时”写入(因为整个事情需要很长时间,并且每个进程都写入文件以保存部分进度的想法)。如果您能详细说明Queue 的想法...
    • 我添加了一个使用 Queue 和专用编写器的示例。
    • 这种global 的使用是否适用于将任何 Python 对象“传递”给进程?还是只针对特定的?
    • 这仅适用于在可以在进程之间继承的原语之上实现的东西,例如文件描述符。它不适用于严格的内存结构,例如大多数 Python 数据结构...但请参阅multiprocessing.Manager 上的文档以创建进程之间可共享的对象。
    • 当使用“fork”而不是“spawn”时,这也仅适用于 linux 或 OSX
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-07-14
    • 2013-01-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-12-23
    相关资源
    最近更新 更多