【问题标题】:Python multiprocessing processes terminating?Python多处理进程终止?
【发布时间】:2018-07-19 13:46:55
【问题描述】:

我在 * 上阅读了许多关于 Python 多处理的答案,我认为这对我的目的最有用:python multiprocessing queue implementation

这是我想做的事情:轮询数据库以获取新工作,将其放入队列中并让 4 个进程连续完成工作。我不清楚的是当队列中的项目完成处理时会发生什么。在上面的问题中,当队列为空时,进程终止。但是,就我而言,我只想一直等到队列中有数据。那么我只是睡觉并定期检查队列吗?所以我的工作进程永远不会死?这是好的做法吗?

def mp_worker(queue):
    while True:
        if (queue.qsize() == 0):
            time.sleep(20)
        else:
            db_record = queue.get()
            process_file(db_record)

def mp_handler():
    num_workers = 4
    processes = [Process(target=mp_worker, args=(queue,)) for _ in range(num_workers)]

    for process in processes:
        process.start()

    for process in processes:
        process.join()    

if __name__ == '__main__':
    db_conn = db.create_postgre_connection(DB_CONFIG)
    while True:
        db_records = db.retrieve_received_files(DB_CONN)
        if (len(db_records) > 0):
            for db_record in db_records:
                queue.put(db_record)
                mp_handler()
        else:
            time.sleep(20)


    db_conn.close()

有意义吗?

谢谢。

【问题讨论】:

    标签: python python-multiprocessing


    【解决方案1】:

    想通了。工人必须死,否则他们永远不会回来。但是无论如何,当有数据时,我都会启动一组新的工人,所以这不是问题。更新代码:

    def mp_worker(queue):
        while queue.qsize() > 0 :
            db_record = queue.get()
            process_file(db_record)
    
    def mp_handler():
        num_workers = 4
        if (queue.qsize() < num_workers):
            num_workers = queue.qsize()
        processes = [Process(target=mp_worker, args=(queue,)) for _ in range(num_workers)]
        for process in processes:
            process.start()
        for process in processes:
            process.join()    
    
    if __name__ == '__main__':
        while True:
            db_records = db.retrieve_received_files(DB_CONN)
            print(db_records)
            if (len(db_records) > 0):
                for db_record in db_records:
                    queue.put(db_record)
                    mp_handler()
            else:
                time.sleep(20)
    
        DB_CONN.close()
    

    【讨论】: