【问题标题】:Multiprocessing Queue.get() hangs多处理 Queue.get() 挂起
【发布时间】:2016-09-11 16:01:05
【问题描述】:

我正在尝试实现基本的多处理,但遇到了问题。下面附上python脚本。

import time, sys, random, threading
from multiprocessing import Process
from Queue import Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue(10)
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        item = append_queue.get()
        database.append(item)
        print("Appended to database in %.4f seconds" % database.append_time)
        append_queue.task_done()
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    append_queue.join()
    #append_queue_process.join()
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

AnalyzeFrequency 分析文件中单词的频率,get_results() 返回所述单词和频率的排序列表。列表非常大,可能有 10000 项。

然后将此列表传递给add_to_append_queue 方法,该方法将其添加到队列中。 process_append_queue 一项一项地获取项目并将频率添加到“数据库”。此操作比main() 中的实际分析需要更长的时间,因此我尝试对此方法使用单独的过程。当我尝试使用线程模块执行此操作时,一切正常,没有错误。当我尝试使用 Process 时,脚本挂在 item = append_queue.get()

有人能解释一下这里发生了什么,或许可以指导我解决问题吗?

感谢所有答案!

更新

泡菜错误是我的错,只是一个错字。现在我在多处理中使用 Queue 类,但 append_queue.get() 方法仍然挂起。 新代码

import time, sys, random
from multiprocessing import Process, Queue
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency

append_queue = Queue()
database = FrequencyStore()

def add_to_append_queue(_list):
    append_queue.put(_list)

def process_append_queue():
    while True:
        database.append(append_queue.get())
        print("Appended to database in %.4f seconds" % database.append_time)
    return

def main():
    database.load_db()
    print("Database loaded in %.4f seconds" % database.load_time)
    append_queue_process = Process(target=process_append_queue)
    append_queue_process.daemon = True
    append_queue_process.start()
    #t = threading.Thread(target=process_append_queue)
    #t.daemon = True
    #t.start()

    while True:
        path = raw_input("file: ")
        if path == "exit":
            break
        a = AnalyzeFrequency(path)
        a.analyze()
        print("Analyzed file in %.4f seconds" % a._time)
        add_to_append_queue(a.get_results())

    #append_queue.join()
    #append_queue_process.join()
    print str(append_queue.qsize())
    database.save_db()
    print("Database saved in %.4f seconds" % database.save_time)
    sys.exit(0)

if __name__=="__main__":
    main()

更新 2

这是数据库代码:

class FrequencyStore:

    def __init__(self):
        self.sorter = Sorter()
        self.db = {}
        self.load_time = -1
        self.save_time = -1
        self.append_time = -1
        self.sort_time = -1

    def load_db(self):
        start_time = time.time()

        try:
            file = open("results.txt", 'r')
        except:
            raise IOError

        self.db = {}
        for line in file:
            word, count = line.strip("\n").split("=")
            self.db[word] = int(count)
        file.close()

        self.load_time = time.time() - start_time

    def save_db(self):
        start_time = time.time()

        _db = []
        for key in self.db:
            _db.append([key, self.db[key]])
        _db = self.sort(_db)

        try:
            file = open("results.txt", 'w')
        except:
            raise IOError

        file.truncate(0)
        for x in _db:
            file.write(x[0] + "=" + str(x[1]) + "\n")
        file.close()

        self.save_time = time.time() - start_time

    def create_sorted_db(self):
        _temp_db = []
        for key in self.db:
            _temp_db.append([key, self.db[key]])
        _temp_db = self.sort(_temp_db)
        _temp_db.reverse()
        return _temp_db

    def get_db(self):
        return self.db

    def sort(self, _list):
        start_time = time.time()

        _list = self.sorter.mergesort(_list)
        _list.reverse()

        self.sort_time = time.time() - start_time
        return _list

    def append(self, _list):
        start_time = time.time()

        for x in _list:
            if x[0] not in self.db:
                self.db[x[0]] = x[1]
            else:
                self.db[x[0]] += x[1]

        self.append_time = time.time() - start_time

【问题讨论】:

  • A Queue.Queue 不能跨进程工作。所以第一个改变是改用multiprocessing.Queue

标签: python database queue multiprocessing


【解决方案1】:

评论建议您尝试在 Windows 上运行它。正如我在评论中所说,

如果你在 Windows 上运行它,它就不能工作 - Windows 不能 有fork(),所以每个进程都有自己的队列,他们什么都没有 彼此做。整个模块由“从头开始”导入 Windows 上的每个进程。您需要在main() 中创建队列, 并将其作为参数传递给工作函数。

这里充实了您需要做的事情以使其可移植,尽管我删除了所有数据库内容,因为它与您迄今为止描述的问题无关。我还删除了daemon 摆弄,因为这通常只是一种避免干净关闭事物的懒惰方式,而且通常不会在以后回来咬你:

def process_append_queue(append_queue):
    while True:
        x = append_queue.get()
        if x is None:
            break
        print("processed %d" % x)
    print("worker done")

def main():
    import multiprocessing as mp

    append_queue = mp.Queue(10)
    append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,))
    append_queue_process.start()
    for i in range(100):
        append_queue.put(i)
    append_queue.put(None)  # tell worker we're done
    append_queue_process.join()

if __name__=="__main__":
    main()

输出是“明显”的东西:

processed 0
processed 1
processed 2
processed 3
processed 4
...
processed 96
processed 97
processed 98
processed 99
worker done

注意:因为 Windows 不(不能)fork(),所以工作进程不可能继承 Windows 上的任何 Python 对象。每个进程从一开始就运行整个程序。这就是为什么您的原始程序无法运行的原因:每个进程都创建了自己的Queue,与另一个进程中的Queue 完全无关。在上面显示的方法中,只有主进程创建一个Queue,然后主进程将它(作为参数)传递给工作进程。

【讨论】:

  • 代码有效!还有一件事,如果你看一下我的 FrequencyAnalysis.py,你会看到 FrequencyStorage 类。如果我从另一个进程调用 append 方法,它不会更新该类中的实例变量。我将对象作为参数传递,就像您对队列所做的那样。
  • 那将是一个完全不同的问题,因此使测试用例尽可能小并发布一个不同的问题。一般来说,您不应该期望任何对象的任何突变在进程中都是可见的。内存不共享。 multiprocessing.Queue 之所以有效,是因为它是从头开始实现的,make 突变在整个进程中可见——这不是魔法发生的(它是通过在幕后的进程间管道传递有关突变的信息而发生的进程,由进程间信号量保护以防止同时发生突变)。
【解决方案2】:

queue.Queue 是线程安全的,但不能跨进程工作。不过,这很容易解决。而不是:

from multiprocessing import Process
from Queue import Queue

你想要:

from multiprocessing import Process, Queue

【讨论】:

  • 我是盲人屏幕阅读器用户,无法读取图像中的回溯,抱歉。它也无助于日后在您的回溯中搜索文本的人。最好更新问题,在评论中发布回溯,或创建一个新问题。
  • 您的数据库不是进程安全的。您正在 Python 主进程中创建数据库实例,然后告诉 Python 在完全不同的进程中对其进行处理。所以对database.append 的调用挂起,而不是对Queue.get 的调用。使用队列的全部意义在于避免这个确切的问题。
  • 我刚刚测试了 Queue.get 和 database.append 方法,每个方法后面都有一个 print 语句。 Queue.get 语句永远不会继续,它只是阻塞。我还使用数据库中的代码更新了原始问题。
  • 正如我所怀疑的那样。当我在 linux 上运行它时,它没有任何问题。也许是由于 Windows 中的某些原因?
  • 如果没有可运行的样本,就不可能说。
猜你喜欢
  • 1970-01-01
  • 2015-07-06
  • 2020-06-05
  • 2020-04-11
  • 2012-04-03
  • 1970-01-01
  • 2021-10-14
  • 1970-01-01
相关资源
最近更新 更多