【问题标题】:Drop oldest item from python's Queue.Queue (with synchronization)从 python Queue.Queue 中删除最旧的项目(同步)
【发布时间】:2017-03-06 15:09:08
【问题描述】:

我是否可以配置Queue.Queue 使其始终接受新项目,如果队列已满则简单地丢弃最旧的项目?

如果没有,标准库中是否有另一个队列类可以做到这一点?

(我不能使用双端队列,因为我有一个需要同步的生产者/消费者设置。)

【问题讨论】:

  • 您可以使用collections.deque 传递maxlen 参数
  • 啊,但我需要同步它。看起来它不像只使用互斥锁那么简单,因为我想阻塞消费者线程直到项目可用(而不是连续轮询)。
  • 我也这么认为。我重新打开了
  • 平时都是这么干的,为什么不这么简单呢?顺便说一句,为了安全起见,你知道 python 的 GIL 对线程的限制吗?
  • @spectras 我目前在消费者循环中使用queue.get(block=True)。在引擎盖下,我假设这等待一个条件变量。要在此处使用互斥锁,我将不得不忙于等待由互斥锁保护的 queue.get(block=False),这会消耗更多的 CPU。

标签: python queue


【解决方案1】:

使用条件保护资源访问的示例,就像我在 cmets 中所说的那样。

import collections
import threading
import time

queue = collections.deque()
condition = threading.Condition()

def consumer():
    condition.acquire()
    while True:
        while queue:
            item = queue.popleft()
            condition.release()
            # do something with item
            print(item)
            condition.acquire()
        condition.wait()

def push_item(item):
    with condition:
        queue.append(item)
        condition.notify()

# From that point forward, it is just demonstration code to show how to use

def example_producer_thread(*args):
    for arg in args:
        push_item(arg)

consumer_thread = threading.Thread(target=consumer, name='queue consumer')
consumer_thread.daemon = True  # so it does not prevent python from exiting
consumer_thread.start()

for example in [range(0, 10), range(10, 20), range(20, 30)]:
    threading.Thread(target=example_producer_thread, args=example).start()

time.sleep(1) # let the consumer thread some time before the script gets killed

核心在这里:

  • consumer() 是一个消费者线程,它保持空闲(无轮询),直到其他线程将项目放入队列中。唤醒时,它将锁定队列,获取一个项目,解锁队列,处理该项目,直到队列中没有更多项目。然后它释放它并重新进入睡眠状态。
  • push_item() 将单个项目推送到队列中,并通知消费者线程它应该唤醒。

剩下的只是让它成为一个工作示例。 example_producer_thread 只会将其参数推入队列。我们从其中三个开始,每个都对一系列数字进行运算,以便我们查看结果。

只需将maxlen 添加到队列中即可。也许在你使用的时候将功能封装在一个小类中。

【讨论】:

    【解决方案2】:

    更新:请不要使用它。正如@spectras 指出的那样,它实际上并没有正确同步。

    这不是特别优雅,但它似乎适合我与多个作家一起使用。

    class QueueLatest(queue.Queue):
        def put(self, item):
            while True:
                try:
                    super().put(item, block = False)
                    break
                except queue.Full:
                    _ = self.queue.popleft()
    

    【讨论】:

    • 您正在以非同步方式访问底层self.queue 对象。这最终崩溃,具体取决于queue.Queue 的包装方式。..
    • 我并不是说你错了,但是查看 Queue 的 Python 3.7 实现,我看不出 get() 和 _get() 比 popleft() 做更多的同步,当然 _get() 被实现为对 popleft() 的调用。我欢迎进一步解释同步的细节。
    • _get 是一种内部方法,不得从外部使用(正是因为它假定任何同步都已由其调用者处理)。 get 使用 threading.Condition 正确同步。
    • 啊,我之前没有看 not_empty 和 not_full 属性的定义。谢谢!很遗憾,我不允许对自己的答案投反对票。
    • 呵呵,加上评论,它实际上提供了价值。未采取的路径以及为什么不采取它们的详细信息与所采取的路径一样有价值。 :)
    【解决方案3】:
    from queue import Queue, Full
    
    class QueueLatest(Queue):
        ''' customized put'''
        def put(self, *args, **kwargs):
            try:
                super().put(*args, **kwargs)
            except Full:
                self.queue.popleft()
                super().put(*args, **kwargs)
    

    根据@Eric Smith 的回答,它似乎有效。我在 q = QueueLatest(1) 中使用过,似乎工作正常。不太确定它有多强大或是否可能存在任何种族条件等。

    【讨论】:

    • 与埃里克史密斯的回答相同的评论。这使用了没有同步的底层self.queue 对象。
    猜你喜欢
    • 2011-12-24
    • 1970-01-01
    • 2011-01-15
    • 2012-01-02
    • 2020-05-30
    • 1970-01-01
    • 2016-04-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多