【问题标题】:multiprocessing pool: determine process names (unable to terminate its processes)多处理池:确定进程名称(无法终止其进程)
【发布时间】:2020-06-29 23:38:58
【问题描述】:

我有一些代码尝试在 Pool 中创建 4 个进程。

一旦我遇到任何异常(例如,它尝试连接的数据库已关闭),我想终止池,休眠 10 秒,然后创建一个包含 4 个进程的新池。

但是,池似乎永远不会被杀死,因为进程名称每次都会不断增加。 池是否有缓存来保存名称计数?

def connect_db() 
  pass


while True: 
 p = Pool(4)
 for process in multiprocessing.active_children():
  print(process.name) #why is the name incremented by 1 each time while loop iterates? 
 try:
  r = p.map(connect_db, ())
 except Exception as e:
  pool.close()
  pool.join()
  time.sleep(10)

前四个进程是SpawnPoolWorker-1到4,后4个是SpawnPoolWorker-5到8。它怎么知道我之前已经创建了4个进程?我每次都在创建一个 Pool 的新实例,还是我做错了什么?

【问题讨论】:

  • 您应该真正使用上下文管理器来处理池。此外,像这样使用except Exception 是不好的做法,请参阅*.com/questions/54948548/…

标签: python python-3.x python-multiprocessing


【解决方案1】:

您没有看到预期结果的主要原因是以下代码行:

r = p.map(connect_db, ())

您使用空的可迭代对象调用multiprocess.map,因此根本没有调用connect_db,并且您没有到达代码的except 部分,也没有关闭池等。

这是一个可以工作的框架,带有一堆用于调试的print 语句。我附上下面的输出,如您所见,每轮恰好有四个子进程。

import multiprocessing
import time 
import random

def connect_db(i):
    print(f"Trying to connect {i}")
    time.sleep(random.random() * 2)
    raise Exception("Failed to connect")

while True: 
    p = multiprocessing.Pool(4)
    print("active children are:")
    for idx, process in enumerate(multiprocessing.active_children()):
        print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates? 
    try:
        print("About to create a pool")
        r = p.map(connect_db, range(4))
        print("Created a pool")
    except Exception as e:
        print(e)
        print("terminating threads")

        p.terminate()
    p.close()
    p.join()
    time.sleep(5)

输出:

active children are:
Child number 0 is ForkPoolWorker-2
Child number 1 is ForkPoolWorker-1
Child number 2 is ForkPoolWorker-4
Child number 3 is ForkPoolWorker-3
About to create a pool
Trying to connect 0
Trying to connect 1
Trying to connect 2
Trying to connect 3
Failed to connect
terminating threads
active children are:
Child number 0 is ForkPoolWorker-5
Child number 1 is ForkPoolWorker-6
Child number 2 is ForkPoolWorker-8
Child number 3 is ForkPoolWorker-7
About to create a pool
Trying to connect 0
Trying to connect 1
...

最后一点 - 如果用例确实是数据库连接,则有现成的连接池,您可能应该使用其中一个。另外,我不确定是否可以跨进程共享数据库连接。

控制池中的进程名称

如果出于某种原因,您想要控制池中的进程名称,您可以通过创建自己的池上下文来实现:

import multiprocessing
from multiprocessing import context
import time 
import random

process_counter = 0

class MyForkProcess(multiprocessing.context.ForkProcess):
    def __init__(self, *args, **kwargs):
        global process_counter
        name = f"MyForkProcess-{process_counter}"
        process_counter += 1
        super(MyForkProcess, self).__init__(*args, name = name, **kwargs)

class MyContext(multiprocessing.context.ForkContext):
    _name = 'MyForkContext'
    Process = MyForkProcess 

def connect_db(i):
    print(f"Trying to connect {i}")
    cp = multiprocessing.current_process()
    print(f"The name of the child process is {cp.name}")
    time.sleep(random.random() * 2)
    raise Exception("Failed to connect")

context = MyContext()
while True: 
    p = context.Pool(4)
    print("active children are:")
    for idx, process in enumerate(multiprocessing.active_children()):
        print(f"Child number {idx} is {process.name}") #why is the name incremented by 1 each time while loop iterates? 
    try:
        print("About to create a pool")
        r = p.map(connect_db, range(4))
        print("Created a pool")
    except Exception as e:
        print(e)
        print("terminating threads")

        p.terminate()
        process_counter = 0

    p.close()
    p.join()
    time.sleep(5)

现在的输出是:

active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-3
Child number 3 is MyForkPoolWorker-1
About to create a pool
Trying to connect 0
The name of the child process is MyForkPoolWorker-0
Trying to connect 1
The name of the child process is MyForkPoolWorker-1
Trying to connect 2
The name of the child process is MyForkPoolWorker-2
Trying to connect 3
The name of the child process is MyForkPoolWorker-3
Failed to connect
terminating threads
active children are:
Child number 0 is MyForkPoolWorker-2
Child number 1 is MyForkPoolWorker-0
Child number 2 is MyForkPoolWorker-1
Child number 3 is MyForkPoolWorker-3
About to create a pool
...

【讨论】:

  • 我的问题不同。我的 sn-p 是问题的简化。为什么您的池第二次命名为 ForkPoolWorker-5?你如何让它们每次都 ForkPoolWorker-1 到 4?
  • 我相信名字是自动生成的,为什么名字很重要?
  • 在任何情况下,您都可以使用 process.name = "my_new_name" 更改进程的名称。
  • 我使用这些名称来执行操作,因为每个进程都在获取数字 1 到 4 在连接到 db 之前和内部,我可以取出数字部分并将消息发送到适当的数据库。现在它得到 5-8,它打破了。它是如何得到这些名字的?我可以设置名称,但如何确保每次池获得一组名称为 1-4 的新进程
  • 在我的代码示例中,我将 range(4) 映射到四个进程。这意味着每个进程都会在 connect_db 中获得自己的标识符(参数“i”)。您可以使用该编号作为进程标识符。