【发布时间】:2020-05-18 03:32:45
【问题描述】:
我一直在尝试找到一个看起来像我的实现,但我似乎找不到。
细节:我检索了一些数据库记录,并希望在最多 5 个线程中处理所有这些记录。但我希望这些线程报告任何潜在的错误,然后关闭各个线程(或记录它们)。所以我想将所有记录推送到队列中,并让线程从队列中获取。
到目前为止,我有这个。
class DatabaseRecordImporterThread(threading.Thread):
def __init__(self, record_queue):
super(DatabaseRecordImporterThread, self).__init__()
self.record_queue = record_queue
def run(self):
try:
record = self.record_queue.get()
force_key_error(record)
except Exception as e:
print("Thread failed: ", e) # I want this to print to the main thread stdout
logger.log(e) # I want this to log to a shared log file (with appending)
MAX_THREAD_COUNT = 5
jobs = queue.Queue()
workers = []
database_records_retrieved = database.get_records(query) # unimportant
# this is where i put all records on a queue
for record in database_records_retrieved:
jobs.put(record)
for _ in range(MAX_THREAD_COUNT):
worker = DatabaseRecordImporterThread(jobs)
worker.start()
workers.append(worker)
print('*** Main thread waiting')
jobs.join()
print('*** Done')
所以想法是每个线程都获取jobs 队列,并且他们正在从中检索记录并打印。由于未预先指定要处理的数量(定义为一次处理 k 条记录或其他内容),因此每个线程将尝试仅处理队列中的任何内容。但是,当我强制出错时,输出看起来像这样。
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
Thread failed: 'KeyError'
*** Main thread waiting
当没有错误报告时,线程每个只读取一条记录:
(record)
(record)
(record)
(record)
(record)
*** Main thread waiting
在正常的线程设置中,我知道您可以通过执行类似操作来设置队列
Thread(target=function, args=(parameters, queue)
但是当你使用一个继承了 Thread 对象的类时,你如何正确设置它呢?我似乎无法弄清楚。我的一个假设是队列对象并不浅,所以创建的每个新对象实际上都指向内存中的同一个队列——这是真的吗?
这些线程显然是挂起的,因为它们不是(?)守护线程。不仅如此,似乎线程每个只读取一条记录,然后做同样的事情。有些事情我想做但不知道怎么做。
- 如果所有线程都失败了,主线程应该继续前进并说“***完成”。
- 线程应该继续处理队列直到它为空
为了执行 (2),我可能需要在主线程中添加一些东西,例如 while !queue.empty,但是我如何确保将线程限制为最多 5 个?
【问题讨论】:
-
看起来我想出了一个部分,那就是在每个线程中,我只需要确保循环使用类似
while not self.queue.empty()的内容,但我不确定这是否是最佳做法。它不能解决问题 (1)
标签: python multithreading queue