【问题标题】:multiprocessing spawning processes without pooling多进程生成进程,无需池化
【发布时间】:2022-01-08 19:03:13
【问题描述】:

我正在尝试使用多处理库在不使用池且不创建僵尸的情况下生成新进程。

在 Unix 上,当一个进程完成但尚未加入时,它变成一个 僵尸。永远不应该有很多,因为每次都有新的 进程开始(或调用active_children())全部完成 尚未加入的进程将被加入。也叫 已完成进程的Process.is_alive 将加入该进程。即使是这样 明确加入所有流程可能是一种好习惯 你开始吧。

这个实现是一个更大的脚本的一个简短版本,它会在几个小时后创建僵尸:

from multiprocessing import Process
import time

def target(task):
    print(f"Working for {task*2} seconds ...")
    time.sleep(task*2)

if __name__ == '__main__':
    processes = 4
    list_process = [None] * processes
    targets = [[2] for i in range(10)]

    list_process = [None] * processes
    while targets:
        for i in range(processes):
            p = list_process[i]
            if not (p and p.is_alive()):
                list_process[i] = Process(target=target, args=(targets.pop(0)))
                list_process[i].start()
                if p:
                    p.join()

    for process in list_process:
        if process:
            process.join()

在更大的版本中,list_process 只有僵尸,无法处理更多任务。

更新 1

感谢Booboo,我能够更好地了解正在发生的事情:

from multiprocessing import Process
import time

def target(task):
    print(f"Working for {task*2} seconds ...")
    time.sleep(task*2)

if __name__ == '__main__':
    started_count = 0
    joined_count = 0
    joined_list = []
    processes = 4
    list_process = [None] * processes
    targets = [[2] for i in range(10)]

    list_process = [None] * processes
    while targets:
        for i in range(processes):
            p = list_process[i]
            if not (p and p.is_alive()):
                list_process[i] = Process(target=target, args=(targets.pop(0)))
                list_process[i].start()
                print(list_process[i].pid)
                started_count += 1
                if p:
                    assert(not p.is_alive())
                    p.join()
                    joined_list.append(list_process[i].pid)
                    joined_count += 1

    for process in list_process:
        if process:
            process.join()
            joined_list.append(list_process[i].pid)
            joined_count += 1

    print(f'Final started count: {started_count}, final joined count: {joined_count}')
    print(joined_list)

输出:

20604
24108
1272
23616
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
18492
17348
19992
6216
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
18744
26240
Working for 4 seconds ...
Working for 4 seconds ...
Final started count: 10, final joined count: 10
[18492, 17348, 19992, 6216, 18744, 26240, 6216, 6216, 6216, 6216]

我有 10 个进程已加入,但有些不是好的进程(pid 6216 尚未为任务调用,第一个未加入),导致未加入进程,为什么?

【问题讨论】:

  • 有些进程不是.join() 导致僵尸进程(从提到的文档中简单推断)。由于同时处理的数量在列表list_process 中具有最大值,因此并非所有任务都已完成。
  • 我认为在while-loop中,if p:之后,joined_list.append(list_process[i].pid)应该是joined_list.append(p.pid)?另外,恕我直言,您可能会遇到麻烦:pops 可能比 targets 中的元素多。

标签: python python-3.x multiprocessing python-multiprocessing zombie-process


【解决方案1】:

我以前看过这段代码,就目前而言,它似乎是正确的。我已经对其进行了修改以跟踪进程启动和加入的次数,并添加了一个断言作为“健全性检查”:

from multiprocessing import Process
import time

def target(task):
    print(f"Working for {task*2} seconds ...")
    time.sleep(task*2)

if __name__ == '__main__':
    started_count = 0
    joined_count = 0
    processes = 4
    list_process = [None] * processes
    targets = [[2] for i in range(10)]

    list_process = [None] * processes
    while targets:
        for i in range(processes):
            p = list_process[i]
            if not (p and p.is_alive()):
                list_process[i] = Process(target=target, args=(targets.pop(0)))
                list_process[i].start()
                started_count += 1
                print('started count:', started_count)
                if p:
                    assert(not p.is_alive())
                    p.join()
                    joined_count += 1
                    print('joined count:', joined_count)

    for process in list_process:
        if process:
            process.join()
            joined_count += 1
            print('joined count:', joined_count)

    print(f'Final started count: {started_count}, final joined count: {joined_count}')

打印:

started count: 1
started count: 2
started count: 3
started count: 4
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
started count: 5
joined count: 1
started count: 6
joined count: 2
started count: 7
joined count: 3
started count: 8
joined count: 4
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
started count: 9
joined count: 5
started count: 10
joined count: 6
joined count: 7
Working for 4 seconds ...
Working for 4 seconds ...
joined count: 8
joined count: 9
joined count: 10
Final started count: 10, final joined count: 10

您的程序中是否还有其他未发布的内容导致了问题?

实现进程池

如果我可以提出一个建议:您实现进程池的方法相当低效。如果您有 100 个任务要提交,那么您将创建 100 个进程。那是不是处理池的定义。诚然,您正在控制并行度,但未能重用进程,这是池的核心理想。下面演示了如何创建一个包含 4 个进程的池,可以根据需要执行任意数量的任务。完成所有任务后,您只需加入 4 个流程。这可能对解决您的僵尸问题大有帮助:

from multiprocessing import Process, Queue
import time

def target(queue):
    while True:
        task = queue.get()
        if task is None: # "end of file" indicator
            break
        print(f"Working for {task*2} seconds ...")
        time.sleep(task*2)

if __name__ == '__main__':
    N_PROCESSES = 4
    processes = []
    queue = Queue()
    for _ in range(N_PROCESSES):
        processes.append(Process(target=target, args=(queue,)))
    for process in processes:
        process.start()
    # Write tasks to the job queue:
    for _ in range(10):
        queue.put(2)
    # And write an "end of file" indicator for each process in the pool:
    for _ in range(N_PROCESSES):
        queue.put(None)
    # Wait for processes to complete:
    for process in processes:
        process.join()

打印:

Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ...
Working for 4 seconds ..

请注意,您还可以向每个进程传递第二个队列以输出结果。请务必在加入进程之前将此队列的结果get

【讨论】:

  • 感谢您的详细建议。但是,我的道歉我还不够清楚,我想做的是为每个任务生成一个新流程,并且永远不要重用相同的流程。没有僵尸怎么办?
  • 好吧,正如我所说,您发布的代码似乎是正确的,正如我用您发布的有限代码所证明的那样。但是很明显,您的实际程序要大得多,并且必须有一些其他交互无法用您所展示的内容来解释。
  • 你也可以使用multiprocessing.Pool(4, maxtasksperchild=1)实现你想要的。这将提供 4 个任务的并行性,但将为每个提交的任务创建一个新进程。如果您使用方法map,请务必指定chunksize=1
猜你喜欢
  • 2017-07-03
  • 2021-04-15
  • 2021-08-13
  • 1970-01-01
  • 1970-01-01
  • 2022-11-30
  • 1970-01-01
  • 1970-01-01
  • 2018-11-15
相关资源
最近更新 更多