【问题标题】:Multiprocess pool initialization with sequential initializer argument使用顺序初始化器参数进行多进程池初始化
【发布时间】:2020-12-13 06:29:11
【问题描述】:

我有如下代码:

import multiprocessing as mp

connection: module.Connection

def client_id():
    for i in range(mp.cpu_count*2):
        yield i

def initproc(host: str, port: int, client_id: int):
    global connection
    connection.connect(host, port, client_id)

def main():
    host = "something"
    port = 12345
    mp.get_context("spawn").Pool(processes=mp.cpu_count()*2,
                                 initializer=initproc,
                                 initargs=(host, port, client_id())) as p:
        res = p.starmap(processing_function, arg_list)
    

就问题而言, processing_function 和 arg_list 不相关。

问题是我收到了一个错误:

    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'generator' object

有没有什么方法可以在池中创建一个初始化进程,使得初始化它的参数中的一个是序列中的下一个数字?

附:在编写的代码中,可以在初始化函数之外初始化所有连接对象,但在我的特定实例中它不是。我需要将连接参数传递给初始化程序。

【问题讨论】:

  • 有几件事情你应该调查,因为他们可能不会做你认为他们做的事。 globalmultiprocessing 不要混用 - 每个进程中都会有一个 new connectioninitproc 说它期望 client_id: int,但您传递的是 client_id: Iterable[int](因为 client_id() 是生成器)。最后,你打算尾随的as p 做什么?
  • as p 允许我将池对象寻址为变量pglobal 是必需的,因为变量 connection 否则将在函数内是本地的,并且无法从数据处理函数中访问。 Iterable 与否我什至没有达到initproc 正在获得处理价值的地步

标签: python python-3.x multiprocessing python-multiprocessing process-pool


【解决方案1】:

对于您的情况,一个简单的解决方案是使用 Process.name 中包含的子进程的序列号。你可以用...提取它。

mp.current_process().name.split('-')[1]

如果您需要更多地控制序列的开始位置,您可以使用 multiprocessing.Value 作为计数器,工作人员可以从中获取其唯一编号。

import multiprocessing as mp
import time


def init_p(client_id):
    with client_id.get_lock():
        globals()['client_id'] = client_id.value
        print(f"{mp.current_process().name},"
              f" {mp.current_process().name.split('-')[1]},"  # alternative
              f" client_id:{globals()['client_id']}")
        client_id.value += 1


if __name__ == "__main__":

    ctx = mp.get_context("spawn")
    client_ids = ctx.Value('i', 0)

    with ctx.Pool(
            processes=4,
            initializer=init_p,
            initargs=(client_ids,)
    ) as pool:

        time.sleep(3)

输出:

SpawnPoolWorker-2, 2, client_id:0
SpawnPoolWorker-3, 3, client_id:1
SpawnPoolWorker-1, 1, client_id:2
SpawnPoolWorker-4, 4, client_id:3

Process finished with exit code 0

【讨论】:

    猜你喜欢
    • 2016-01-06
    • 2012-04-24
    • 1970-01-01
    • 2022-07-06
    • 1970-01-01
    • 1970-01-01
    • 2012-02-16
    • 2017-11-02
    • 1970-01-01
    相关资源
    最近更新 更多