【问题标题】:Python multiprocessing hangs at joinPython 多处理在连接时挂起
【发布时间】:2018-11-29 06:33:39
【问题描述】:

我正在读取一个视频文件,每 20 帧我将第一帧存储在输入队列中。一旦我在输入队列中获得所有需要的帧,然后我运行多个进程对这些帧执行一些操作并将结果存储在输出队列中。 但是代码总是卡在join处,我尝试了针对此类问题提出的不同解决方案,但似乎都不起作用。

import numpy as np
import cv2
import timeit
import face_recognition
from multiprocessing import Process, Queue, Pool
import multiprocessing
import os

s = timeit.default_timer()

def alternative_process_target_func(input_queue, output_queue):

    while not output_queue.full():
        frame_no, small_frame, face_loc = input_queue.get()
        print('Frame_no: ', frame_no, 'Process ID: ', os.getpid(), '----', multiprocessing.current_process())
        #canny_frame(frame_no, small_frame, face_loc)

        #I am just storing frame no for now but will perform something else later
        output_queue.put((frame_no, frame_no)) 

        if output_queue.full():
            print('Its Full ---------------------------------------------------------------------------------------')
        else:
            print('Not Full')

    print(timeit.default_timer() - s, ' seconds.')
    print('I m not reading anymore. . .', os.getpid())


def alternative_process(file_name):
    start = timeit.default_timer()
    cap = cv2.VideoCapture(file_name)
    frame_no = 1
    fps = cap.get(cv2.CAP_PROP_FPS)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    print('Frames Per Second: ', fps)
    print('Total Number of frames: ', length)
    print('Duration of file: ', int(length / fps))
    processed_frames = 1
    not_processed = 1
    frames = []
    process_this_frame = True
    frame_no = 1
    Input_Queue = Queue()
    while (cap.isOpened()):
        ret, frame = cap.read()
        if not ret:
            print('Size of input Queue: ', Input_Queue.qsize())
            print('Total no of frames read: ', frame_no)
            end1 = timeit.default_timer()
            print('Time taken to fetch useful frames: ', end1 - start)
            threadn = cv2.getNumberOfCPUs()
            Output_Queue = Queue(maxsize=Input_Queue.qsize())
            process_list = []
            #quit = multiprocessing.Event()
            #foundit = multiprocessing.Event()

            for x in range((threadn - 1)):
                # print('Process No : ', x)
                p = Process(target=alternative_process_target_func, args=(Input_Queue, Output_Queue))#, quit, foundit
                #p.daemon = True
                p.start()
                process_list.append(p)
                #p.join()

            # for proc in process_list:
            #     print('---------------------------------------------------------------', proc.p)

            i = 1
            for proc in process_list:
                print('I am hanged here')
                proc.join()
                print('I am done')
                i += 1

            end = timeit.default_timer()
            print('Time taken by face verification: ', end - start)

            break

        if process_this_frame:
            print(frame_no)
            small_frame = cv2.resize(frame, (0, 0), fx=0.25, fy=0.25)
            rgb_small_frame = small_frame[:, :, ::-1]
            face_locations = face_recognition.face_locations(rgb_small_frame)
            # frames.append((rgb_small_frame, face_locations))
            Input_Queue.put((frame_no, rgb_small_frame, face_locations))
            frame_no += 1

        if processed_frames < 5:
            processed_frames += 1
            not_processed = 1

        else:
            if not_processed < 15:
                process_this_frame = False
                not_processed += 1
            else:

                processed_frames = 1
                process_this_frame = True
                print('-----------------------------------------------------------------------------------------------')

    cap.release()
    cv2.destroyAllWindows()

alternative_process('user_verification_2.avi')

【问题讨论】:

    标签: python process parallel-processing multiprocessing


    【解决方案1】:

    正如Process.join() 上的文档所说,hanging(或 “阻塞”)正是预期会发生的情况:

    阻塞调用线程,直到其join()方法为 调用终止或直到发生可选超时。

    join() 停止当前线程,直到目标进程完成。目标进程正在调用alternative_process_target_func,所以问题显然出在那个函数上。它永远不会结束。原因可能不止一个。

    问题 1

    alternative_process_target_func 一直运行到output_queue.full()。如果它永远不会满怎么办?它永远不会结束?以其他方式确定结尾确实更好,例如一直运行到输入队列为空。

    问题 2

    如果输入队列为空,input_queue.get() 将阻塞。正如documentation 所说:

    从队列中删除并返回一个项目。如果可选 args block 为 true 并且 timeout 为 None(默认值),则在必要时阻止,直到项目可用为止。

    您正在运行多个进程,因此不要因为刚才output_queue.full() 为 False 并且输入大小与输出大小相同,就期望输入中有内容。在此期间可能发生了很多事情。

    你想做的是:

    try:
        input_queue.get(False)  # or input_queue.get_nowait()
    except Empty:
        break  # stop when there is nothing more to read from the input
    

    问题 3

    如果输出中没有空间存储数据,output_queue.put((frame_no, frame_no)) 将阻塞。

    再次,您假设输出中有空间,只是因为您刚才检查了output_queue.full(),并且输入大小等于输出大小。永远不要依赖这些东西。

    你想做和输入一样的事情:

    try:
        output_queue.put((frame_no, frame_no), False)
        # or output_queue.put_nowait((frame_no, frame_no))
    except Empty:
        # deal with this somehow, e.g.
        raise Exception("There is no room in the output queue to write to.")
    

    【讨论】:

    • 感谢您的回复。我在这里不明白的是,如果我使用 try except 从输入队列中读取并在那里添加一个打印语句,它不会打印出输入队列中的所有值,在从输入队列中获取一些值后程序停止但实际上它应该打印该 try 块中的所有值。
    • 如果我使用 input_queue.get(timeout=1),它将获取输入队列中的所有值并成功终止程序,但没有超时,即使用 input_queue.get(False),之后获得一些初始值它会终止所有进程。我无法理解这种行为。
    • @Muhammadhassan 好吧,您正在将项目添加到输入队列之后从中读取的进程已启动。因此,如果从队列中读取比写入它们更快,那么它们当然会耗尽它们并停止。这意味着您需要一种不同的方式来确定何时停止,而不是我给您的。超时可能足够好,具体取决于您的期望。
    • 不,我不会在进程开始后将项目添加到输入队列中。 “if not ret”条件确保了这一点。但是,如果我使用 Manager.Queue 而不是队列,则代码会按预期运行。但是不知道为什么会这样?
    • 嗨,如果我设置队列的最大大小怎么办?既然是先进先出,那么在放物品的时​​候,队列满了,会不会释放先入的物品?
    猜你喜欢
    • 2014-07-25
    • 2013-02-25
    • 2012-04-03
    • 1970-01-01
    • 1970-01-01
    • 2016-09-11
    • 2020-06-05
    • 2020-04-11
    相关资源
    最近更新 更多