【问题标题】:Jython threading with thread -> queue -> thread使用线程 -> 队列 -> 线程的 Jython 线程
【发布时间】:2014-07-04 04:54:16
【问题描述】:

我在带有 OpenJDK 64 位 1.7.0_55 JVM 的 Ubuntu 12.04 上运行 Jython 2.5.3。

我正在尝试创建一个简单的线程应用程序来优化数据处理和加载。我有填充线程从数据库中读取记录并在将它们放入队列之前对其进行一些处理。队列由将数据存储在不同数据库中的消费者线程读取。这是我的代码大纲:

import sys
import time
import threading
import Queue

class PopulatorThread(threading.Thread):
    def __init__(self, mod, mods, queue):
        super(PopulatorThread, self).__init__()
        self.mod = mod
        self.mods = mods
        self.queue = queue

    def run(self):
        # Create db connection
        # ...
        try:
            # Select one segment of records using 'id % mods = mod'
            # Process these records & slap them onto the queue
            # ...
        except:
            con.rollback()
            raise
        finally:
            print "Made it to 'finally' in populator %d" % self.mod
            con.close()


class ConsumerThread(threading.Thread):
    def __init__(self, mod, queue):
        super(ConsumerThread, self).__init__()
        self.mod = mod
        self.queue = queue

    def run(self):
        # Create db connection
        # ...
        try:
            while True:
                item = queue.get()
                if not item: break
                # Put records from the queue into
                # a different database
                # ...
                queue.task_done()
        except:
            con.rollback()
            raise
        finally:
            print "Made it to 'finally' in consumer %d" % self.mod
            con.close()


def main(argv):
    tread1Count = 3
    tread2Count = 4
    # This is the notefactsselector data queue
    nfsQueue = Queue.Queue()

    # Start consumer/writer threads
    j = 0
    treads2 = []
    while j < tread2Count:
        treads2.append(ConsumerThread(j, nfsQueue))
        treads2[-1].start()
        j += 1

    # Start reader/populator threads
    i = 0
    treads1 = []
    while i < tread1Count:
        treads1.append(PopulatorThread(i, tread1Count, nfsQueue))
        treads1[-1].start()
        i += 1

    # Wait for reader/populator threads
    print "Waiting to join %d populator threads" % len(treads1)
    i = 0
    for tread in treads1:
        print "Waiting to join a populator thread %d" % i
        tread.join()
        i += 1

    #Add one sentinel value to queue for each write thread
    print "Adding sentinel values to end of queue"
    for tread in treads2:
        nfsQueue.put(None)

    # Wait for consumer/writer threads
    print "Waiting to join consumer/writer threads"
    for tread in treads2:
        print "Waiting on a consumer/writer"
        tread.join()

    # Wait for Queue
    print "Waiting to join queue with %d items" % nfsQueue.qsize()
    nfsQueue.join()
    print "Queue has been joined"


if __name__ == '__main__':
    main(sys.argv)

我已经稍微简化了数据库实现以节省空间。

  1. 当我运行代码时,populator 和 consumer 线程似乎 到达终点,因为我收到“Made it to finally in ...”消息。
  2. 我收到“等待加入 n 个填充线程”消息,最终 “等待加入填充线程 n”消息。
  3. 我收到了“等待加入消费者/编写者线程”消息以及我期望的每个“等待消费者/编写者”消息。
  4. 我收到预期的“等待加入 0 个项目的队列”消息,但没有收到“队列已加入”消息;显然程序在等待队列时阻塞了,它永远不会终止。

我怀疑我的线程初始化或线程连接顺序错误,但我对并发编程的经验很少,所以我对如何做事的直觉没有很好地发展。我发现了很多 Python/Jython 的队列示例,这些队列由 while 循环填充并由线程读取,但到目前为止还没有关于由一组线程填充并由另一组线程读取的队列。

populator 和 consumer 线程似乎已结束。

程序似乎最终阻塞等待队列对象终止。

感谢任何对我有建议和教训的人!

【问题讨论】:

  • 我刚刚编辑了我的示例,以明确说明我在每次使用队列中的记录的迭代中调用 queue.task_done(),以及在 ""Waiting to join a填充线程 %d" % i"
  • 我使用 Jython 2.7b2 运行脚本,但结果相同:它在加入队列时阻塞。 nfsQueue.qsize() 在我调用 .join() 之前返回 0,但它仍然阻塞。

标签: multithreading queue jython


【解决方案1】:

当您处理完队列中的每个项目后,您是否对它调用 task_done()?如果你没有明确告诉队列每个任务都完成了,它永远不会从 join() 返回。

PS:您没有看到“Waiting to join a populator thread %d”,因为您忘记了它前面的打印内容 :)

【讨论】:

  • 糟糕,我的代码过于简单了 :) 是的,我每次都在队列中调用 .task_done()。我应该编辑我的示例以反映这一点。
  • 很好地捕捉到丢失的“打印”。添加它只会让我遇到程序阻塞队列的问题。
  • 基本上,你让我走上了正轨。在我的消费者线程中的循环逻辑中(上面没有复制),我没有在获取哨兵值 None 的 queue.get() 之后调用 queue.task_done();我不认为 N​​one 是一项任务,所以我没有将其标记为完成。这就是堵塞的原因。当我测试“item”时,我需要在“break”之前调用“queue.task_done()”。
猜你喜欢
  • 1970-01-01
  • 2012-05-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-09-27
  • 1970-01-01
  • 2023-03-04
相关资源
最近更新 更多