【问题标题】:Python: Queue.get() from multiple threads (or signal)Python:来自多个线程(或信号)的 Queue.get()
【发布时间】:2014-12-19 07:41:09
【问题描述】:


如何在 Python 中从多个线程中使用 Queue.get()

我想做:一个线程用Queue.put(xxx)发送一个数据,有些线程得到同样的数据。 这个想法就像“信号”。我想在没有 PyQt 的情况下做到这一点。

例如:

#!/usr/bin/python
import threading
import Queue

queue= Queue.Queue()

def Func1():
  while True:
    data= queue.get()
    print 'Func1:got',data
    if data=='q':  break

def Func2():
  while True:
    data= queue.get()
    print 'Func2:got',data
    if data=='q':  break

def MainThread():
  while True:
    data= raw_input('q to quit > ')
    queue.put(data)
    if data=='q':  break

t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()

t1.join()
t2.join()
tm.join()

这里我希望 Func1 和 Func2 从 MainThread 中获取相同的数据,但 Func1 和 Func2 中只有一个可以获取数据。

如果你有一个好主意,请告诉我。

非常感谢!


编辑于 2014-12-19 12:51 EST

基于 Reut Sharabani 的想法,我实现了一个信号类。

#!/usr/bin/python
import threading
import Queue

class TSignal:
  def __init__(self):
    self.queues= {}  #Map from index to queue
    self.counter= 0
    self.locker= threading.Lock()
  def NewQueue(self):
    with self.locker:
      idx= self.counter
      self.counter+= 1
      self.queues[idx]= Queue.Queue()
    queue= self.TQueue(self,idx,self.queues[idx])
    return queue
  def DeleteQueue(self,idx):
    with self.locker:
      del self.queues[idx]
  def put(self,item,block=True,timeout=None):
    for idx,queue in self.queues.iteritems():
      queue.put(item,block,timeout)
  class TQueue:
    def __init__(self,parent,idx,queue):
      self.parent= parent
      self.idx= idx
      self.queue= queue
    def __enter__(self):
      return self
    def __exit__(self,e_type,e_value,e_traceback):
      self.parent.DeleteQueue(self.idx)
    def get(self,block=True,timeout=None):
      return self.queue.get(block,timeout)

signal= TSignal()

def Func1():
  with signal.NewQueue() as queue:
    while True:
      data= queue.get()
      print '\nFunc1:got[%r]\n'%data
      if data=='q':  break

def Func2():
  with signal.NewQueue() as queue:
    while True:
      data= queue.get()
      print '\nFunc2:got[%r]\n'%data
      if data=='q':  break

def MainThread():
  while True:
    data= raw_input('q to quit > ')
    signal.put(data)
    if data=='q':  break

t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()

t1.join()
t2.join()
tm.join()

TSignal 的使用非常简单。在 getter 函数中,插入一条 with 语句,如:

with signal.NewQueue() as queue:

然后以与Queue.get()相同的方式使用队列:

data= queue.get()

在 putter 函数中,只需像 Queue.put() 一样使用 put:

signal.put(data)

问题是如果线程数为N,TSignal需要维护N个队列,而TSignal.put实际上调用了Queue.put N次。所以还是想知道有没有更好的办法。

您对此有什么意见吗?

【问题讨论】:

  • 只是为了清楚。您希望两个线程都获得相同的数据吗?所以当我给“somestring”作为输入时,输出应该是“'Func1:got somestring”和“'Func2:got somestring”?
  • 是的,完全正确。我希望两个(或更多)线程都能获得由单个发送者发送的相同数据。
  • 添加了另一个解决方案,我认为更好。
  • Hey Reut,这是一个很好的解决方法,我想知道是否有任何方法可以利用这里的异步来加快调度到队列的速度?可能是一个不错的改进!代码也可以使用 Python 3.6+ 的更新

标签: python multithreading queue signals


【解决方案1】:

你可以为每个线程使用一个队列吗?如果是这样,您可以使用它自己的队列简单地发布到每个线程:

#!/usr/bin/python
import threading
import Queue

queue1 = Queue.Queue()
queue2 = Queue.Queue()


def func1():
    while True:
        data = queue1.get()
        print 'Func1:got', data
        if data == 'q':
            break


def func2():
    while True:
        data = queue2.get()
        print 'Func2:got', data
        if data == 'q':
            break


def main():
    while True:
        data = raw_input('q to quit > ')
        queue1.put(data)
        queue2.put(data)
        if data == 'q':
            break


t1 = threading.Thread(name='func1', target=func1)
t2 = threading.Thread(name='func2', target=func2)
tm = threading.Thread(name='main', target=main)
t1.start()
t2.start()
tm.start()

t1.join()
t2.join()
tm.join()

编辑:

对于您在 cmets 中的后续问题,这是一种具有固定数量的同步原语的机制。这个想法是使用函数和消息创建任务并将它们提交到线程池以执行。 (注意: python 3 有Barriers,如果您选择其他实现,这里可能会很方便):

#!/usr/bin/python
import threading
import Queue
from multiprocessing.pool import ThreadPool

MAX_THREADS = 10

publish_queue = Queue.Queue()
print_lock = threading.Lock()


def sync_print(msg):
    print_lock.acquire()
    print msg
    print_lock.release()

# the manager actually holds a pool of threads
# he gives tasks to. The tasks are the functions you mean
# to execute zipped with the message.
def manager(functions):
    pool = ThreadPool(min(len(functions), MAX_THREADS))
    while True:
        sync_print("Manager waiting for message")
        message = publish_queue.get()
        sync_print("Manager got message %s" % message)
        if message == 'q':
            pool.close()
            pool.terminate()
            break;
        else:
            # create tasks of form: (function, message)
            tasks = zip(functions, [message] * len(functions))
            pool.map(lambda x: x[0](x[1]), tasks)


def func1(data):
    sync_print('%s:got %s' % (threading.current_thread().name, data))


def func2(data):
    sync_print('%s:got %s' % (threading.current_thread().name, data))


def main():
    while True:
        data = raw_input('q to quit > ')
        # wait for all threads to consume
        publish_queue.put(data)
        if data == 'q':
            break

# the functions you want to execute on each message - these were your threads
functions = [
    func1,
    func2
]

main = threading.Thread(name='main', target=main)
manager = threading.Thread(name='manager', target=manager, args=(functions, ))
manager.start()
main.start()

main.join()

希望这适合您的情况,因为它可能会占用大量处理时间。

【讨论】:

  • 谢谢,这将是一个简单的解决方案。但是如果线程数为N,我们需要维护N个队列,主线程需要执行queue.put N次。所以如果有更好的主意,我想知道。
  • @Akihiko - 通过为每个“线程”(实际上是函数)创建任务,使用 ThreadPool 添加了另一个解决方案。这是通过压缩函数列表和消息中的每个函数并将元组视为池中的任务来完成的。
  • 谢谢@Reut。据我所知,您对每线程队列的想法是最好的。正如您所指出的,每线程队列的计算成本并不高。我的实际问题是在一些并行执行的状态机之间进行通信(他们想将状态更改事件告诉其他状态机)。我只是想知道这个问题的通用解决方案,因为它似乎是一个广泛适用的问题。无论如何,我根据您的想法编辑了我的问题,如果没有更好的解决方案,我会使用它。
  • @Akihiko 我做了两个编辑,第一个不如当前发布的好(但也有效)。我认为您可以在此实现中使用队列来传达消息,但我真的没有您的代码要验证:)
  • 我不知道线程池;我会查一下。谢谢你的第二个解决方案。我有这个想法。对于我的问题解决方案 1 更好,但它会是这类问题的替代解决方案。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2016-05-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-10-15
  • 2017-12-17
  • 1970-01-01
相关资源
最近更新 更多