【问题标题】:python threading with a shared queue and thread classespython线程与共享队列和线程类
【发布时间】:2020-05-18 03:32:45
【问题描述】:

我一直在尝试找到一个看起来像我的实现,但我似乎找不到。

细节:我检索了一些数据库记录,并希望在最多 5 个线程中处理所有这些记录。但我希望这些线程报告任何潜在的错误,然后关闭各个线程(或记录它们)。所以我想将所有记录推送到队列中,并让线程从队列中获取。

到目前为止,我有这个。

class DatabaseRecordImporterThread(threading.Thread):
    def __init__(self, record_queue):
        super(DatabaseRecordImporterThread, self).__init__()
        self.record_queue = record_queue

    def run(self):
        try:
            record = self.record_queue.get()
            force_key_error(record)
        except Exception as e:
            print("Thread failed: ", e)  # I want this to print to the main thread stdout
            logger.log(e)  # I want this to log to a shared log file (with appending)



MAX_THREAD_COUNT = 5
jobs = queue.Queue()
workers = []
database_records_retrieved = database.get_records(query)  # unimportant

# this is where i put all records on a queue
for record in database_records_retrieved:
    jobs.put(record)

for _ in range(MAX_THREAD_COUNT):
    worker = DatabaseRecordImporterThread(jobs)
    worker.start()
    workers.append(worker)

print('*** Main thread waiting')
jobs.join()
print('*** Done')

所以想法是每个线程都获取jobs 队列,并且他们正在从中检索记录并打印。由于未预先指定要处理的数量(定义为一次处理 k 条记录或其他内容),因此每个线程将尝试仅处理队列中的任何内容。但是,当我强制出错时,输出看起来像这样。

Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
Thread failed:  'KeyError'
*** Main thread waiting

当没有错误报告时,线程每个只读取一条记录:

(record)
(record)
(record)
(record)
(record)
*** Main thread waiting

在正常的线程设置中,我知道您可以通过执行类似操作来设置队列

Thread(target=function, args=(parameters, queue)

但是当你使用一个继承了 Thread 对象的类时,你如何正确设置它呢?我似乎无法弄清楚。我的一个假设是队列对象并不浅,所以创建的每个新对象实际上都指向内存中的同一个队列——这是真的吗?

这些线程显然是挂起的,因为它们不是(?)守护线程。不仅如此,似乎线程每个只读取一条记录,然后做同样的事情。有些事情我想做但不知道怎么做。

  1. 如果所有线程都失败了,主线程应该继续前进并说“***完成”。
  2. 线程应该继续处理队列直到它为空

为了执行 (2),我可能需要在主线程中添加一些东西,例如 while !queue.empty,但是我如何确保将线程限制为最多 5 个?

【问题讨论】:

  • 看起来我想出了一个部分,那就是在每个线程中,我只需要确保循环使用类似 while not self.queue.empty() 的内容,但我不确定这是否是最佳做法。它不能解决问题 (1)

标签: python multithreading queue


【解决方案1】:

我想出了问题的答案。在做了很多研究和一些代码阅读之后,需要发生的事情如下

  1. 不应检查队列是否为空,因为它存在竞争条件。相反,工作人员应该在无限循环下继续并尝试继续从队列中检索

  2. 每当队列任务完成时,需要调用queue.task_done() 方法来提醒MainThread join() 方法。发生的情况是task_done 调用的数量将与入队调用的数量同步,一旦队列为空,线程将正式加入。

  3. 对固定数据大小的任务使用队列有点不太理想。与其创建每个线程读取的队列,不如简单地将数据划分为大小相等的块并让线程只运行处理列表子集。这样我们就不会被queue.get() 阻止等待添加新元素。比如while True: if not queue.empty(): do_something()

  4. 如果我们想继续过去,异常处理仍应调用task_done()。根据是否捕获异常来决定整个线程是否应该失败是一种设计选择,但如果是这种情况,则该元素仍应标记为已处理。

【讨论】: