【发布时间】:2014-07-04 04:54:16
【问题描述】:
我在带有 OpenJDK 64 位 1.7.0_55 JVM 的 Ubuntu 12.04 上运行 Jython 2.5.3。
我正在尝试创建一个简单的线程应用程序来优化数据处理和加载。我有填充线程从数据库中读取记录并在将它们放入队列之前对其进行一些处理。队列由将数据存储在不同数据库中的消费者线程读取。这是我的代码大纲:
import sys
import time
import threading
import Queue
class PopulatorThread(threading.Thread):
def __init__(self, mod, mods, queue):
super(PopulatorThread, self).__init__()
self.mod = mod
self.mods = mods
self.queue = queue
def run(self):
# Create db connection
# ...
try:
# Select one segment of records using 'id % mods = mod'
# Process these records & slap them onto the queue
# ...
except:
con.rollback()
raise
finally:
print "Made it to 'finally' in populator %d" % self.mod
con.close()
class ConsumerThread(threading.Thread):
def __init__(self, mod, queue):
super(ConsumerThread, self).__init__()
self.mod = mod
self.queue = queue
def run(self):
# Create db connection
# ...
try:
while True:
item = queue.get()
if not item: break
# Put records from the queue into
# a different database
# ...
queue.task_done()
except:
con.rollback()
raise
finally:
print "Made it to 'finally' in consumer %d" % self.mod
con.close()
def main(argv):
tread1Count = 3
tread2Count = 4
# This is the notefactsselector data queue
nfsQueue = Queue.Queue()
# Start consumer/writer threads
j = 0
treads2 = []
while j < tread2Count:
treads2.append(ConsumerThread(j, nfsQueue))
treads2[-1].start()
j += 1
# Start reader/populator threads
i = 0
treads1 = []
while i < tread1Count:
treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
treads1[-1].start()
i += 1
# Wait for reader/populator threads
print "Waiting to join %d populator threads" % len(treads1)
i = 0
for tread in treads1:
print "Waiting to join a populator thread %d" % i
tread.join()
i += 1
#Add one sentinel value to queue for each write thread
print "Adding sentinel values to end of queue"
for tread in treads2:
nfsQueue.put(None)
# Wait for consumer/writer threads
print "Waiting to join consumer/writer threads"
for tread in treads2:
print "Waiting on a consumer/writer"
tread.join()
# Wait for Queue
print "Waiting to join queue with %d items" % nfsQueue.qsize()
nfsQueue.join()
print "Queue has been joined"
if __name__ == '__main__':
main(sys.argv)
我已经稍微简化了数据库实现以节省空间。
- 当我运行代码时,populator 和 consumer 线程似乎 到达终点,因为我收到“Made it to finally in ...”消息。
- 我收到“等待加入 n 个填充线程”消息,最终 “等待加入填充线程 n”消息。
- 我收到了“等待加入消费者/编写者线程”消息以及我期望的每个“等待消费者/编写者”消息。
- 我收到预期的“等待加入 0 个项目的队列”消息,但没有收到“队列已加入”消息;显然程序在等待队列时阻塞了,它永远不会终止。
我怀疑我的线程初始化或线程连接顺序错误,但我对并发编程的经验很少,所以我对如何做事的直觉没有很好地发展。我发现了很多 Python/Jython 的队列示例,这些队列由 while 循环填充并由线程读取,但到目前为止还没有关于由一组线程填充并由另一组线程读取的队列。
populator 和 consumer 线程似乎已结束。
程序似乎最终阻塞等待队列对象终止。
感谢任何对我有建议和教训的人!
【问题讨论】:
-
我刚刚编辑了我的示例,以明确说明我在每次使用队列中的记录的迭代中调用 queue.task_done(),以及在 ""Waiting to join a填充线程 %d" % i"
-
我使用 Jython 2.7b2 运行脚本,但结果相同:它在加入队列时阻塞。 nfsQueue.qsize() 在我调用 .join() 之前返回 0,但它仍然阻塞。
标签: multithreading queue jython