【问题标题】:Python multiprocess Process never terminatesPython多进程进程永远不会终止
【发布时间】:2013-09-09 17:25:53
【问题描述】:

我下面的例程获取 urllib2.Requests 列表,并为每个请求生成一个新进程并将它们关闭。目的是为了异步速度,所以这一切都是一劳永逸的(不需要响应)。问题是下面代码中产生的进程永远不会终止。因此,在其中一些之后,盒子就会OOM。上下文:Django 网络应用程序。有什么帮助吗?

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()

            # move on after queue is empty
            MPQ.join()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())

【问题讨论】:

  • 当真时:- 有终止?
  • 我尝试了几种不同的方法来解决这个问题,包括终止()、重新定义全局变量、休眠和终止、加入()和不加入()。到目前为止,唯一有效的方法是执行 time.sleep(1) 和 worker.terminate() ,但这会中断进程并且在数千个潜在请求中休眠是行不通的:(
  • 并回答您的问题:当队列清空时满足 True (我知道,这不是很直观)。
  • except Queue.Empty,e: logging.info('task done'); except Exception, e: logging.error(traceback.print_exc())
  • 你的解决方案不好,我建议使用大共享池和map_async而不是队列。

标签: python django process multiprocess


【解决方案1】:

也许我说的不对,但是

MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)



def request_manager(req_list):
    try:
            # put request list in the queue
            pool=[]
            for req in req_list:
                    MPQ.put(req)

                    # call processes on queue
                    worker = multiprocessing.Process(target=process_request, args=(MPQ,))
                    worker.daemon = True
                    worker.start()
                    pool.append(worker)

            # move on after queue is empty
            MPQ.join()
            # Close not needed processes
            for p in pool: p.terminate()

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(MPQ):
    try:
            while True:
                    req = MPQ.get()
                    dr = urllib2.urlopen(req)
                    MPQ.task_done()

    except Exception, e:
            logging.error(traceback.print_exc())

【讨论】:

  • 这消除了大约 20% 的额外进程,但我认为这是由于在某些情况下在进程处于非活动状态之前终止命中。使用这种方法,我在日志中遇到了一堆中断的系统调用错误,这也是我在尝试的解决方案中看到的。
【解决方案2】:
MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
MPQ = multiprocessing.JoinableQueue(MP_CONCURRENT)
CHUNK_SIZE = 20 #number of requests sended to one process.
pool = multiprocessing.Pool(MP_CONCURRENT)

def request_manager(req_list):
    try:
            # put request list in the queue
            responce=pool.map(process_request,req_list,CHUNK_SIZE) # function exits after all requests called and pool work ended
    # OR
            responce=pool.map_async(process_request,req_list,CHUNK_SIZE) #function request_manager exits after all requests passed to pool

    except Exception, e:
            logging.error(traceback.print_exc())


# prcoess requests in queue
def process_request(req):
    dr = urllib2.urlopen(req)

这比你的代码快 5-10 倍

【讨论】:

  • 感谢您的快速回复,但没有奏效。它使用了两倍的内存和几乎两倍的进程(267 vs. 150 我运行相同的例程)。进程仍然是孤立的(完全有内存泄漏)。
  • pool = multiprocessing.Pool(MP_CONCURRENT) 必须在模块中,而不是在函数中
【解决方案3】:

将端“broker”集成到 django(例如 rabbitmq 或类似的东西)。

【讨论】:

  • 这就是我通常会运行(以及像 Celery 之类的东西)以卸载重复性任务的方式,除了在这种情况下,多个请求是循环回此 Web 应用程序的 Web api 命中(所以无论如何我都必须处理处理)。
【解决方案4】:

经过一番摆弄(以及一夜好眠)之后,我相信我已经找到了问题所在(感谢 Eri,你是我需要的灵感)。僵尸进程的主要问题是我没有发回信号表明该进程已完成(并杀死它)我(天真地)认为这两者都是多进程自动发生的。

有效的代码:

# function that will be run through the pool
def process_request(req):
    try:
            dr = urllib2.urlopen(req, timeout=30)

    except Exception, e:
            logging.error(traceback.print_exc())

# process killer
def sig_end(r):
    sys.exit()

# globals
MP_CONCURRENT = int(multiprocessing.cpu_count()) * 2
if MP_CONCURRENT < 2: MP_CONCURRENT = 2
CHUNK_SIZE = 20
POOL = multiprocessing.Pool(MP_CONCURRENT)    

# pool initiator
def request_manager(req_list):
    try:
            resp = POOL.map_async(process_request, req_list, CHUNK_SIZE, callback=sig_end)

    except Exception, e:
            logging.error(traceback.print_exc())

几点说明:

1) 将被“map_async”(本例中为“process_request”)命中的函数必须首先定义(并且在全局声明之前)。

2) 退出进程可能有更优雅的方式(欢迎提出建议)。

3) 在这个例子中使用 pool 确实是最好的(再次感谢 Eri),因为“回调”功能允许我立即发出信号。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-10-25
    • 1970-01-01
    • 2016-04-16
    • 2019-08-07
    • 1970-01-01
    • 1970-01-01
    • 2021-07-17
    • 1970-01-01
    相关资源
    最近更新 更多