【问题标题】:Python PriorityQueue items of same priority got in random order具有相同优先级的 Python PriorityQueue 项目以随机顺序获得
【发布时间】:2022-01-16 02:55:17
【问题描述】:

我使用queue.Queue 类将任务从一个线程传递到另一个线程。后来我需要添加优先级,所以我将其更改为PriorityQueue,使用建议的PrioritizedItem(因为任务是dict,无法比较)。然后,在极少数情况下,它开始导致任务混淆。我花了一段时间才意识到/调试 PriorityQueue 中的相同优先级项目不会保持插入顺序,或者从调试的角度来看更糟糕的是,通常它们会这样做。

我猜,在谈到任务队列时,FIFO 是一种默认值。这就是为什么 Queue 不像 FifoQueue 那样被调用的原因,不是吗?因此,PriorityQueue 应该明确声明它不是同等优先级项目的 FIFO。不幸的是,Python doc 没有警告我们这一点,缺乏警告让我很头疼,可能其他人也很头疼。

我还没有找到任何现成的解决方案,但我很确定其他人可能需要一个 PriorityQueue 来保持同等优先级项目的插入顺序。因此这张票...

【问题讨论】:

    标签: python priority-queue


    【解决方案1】:

    除了我希望 Python 文档会在下一个版本中说明警告,让我分享一下我是如何解决这个问题的。

    heapqPriorityQueue使用)建议我们需要在item的compareed部分插入一个序号,这样计算的优先级就很明显了,避免2个item的优先级相同。

    我还添加了threading.Lock,这样我们就可以避免因为一些线程竞争情况发生而使两个项目具有相同的序列号。

    class _ThreadSafeCounter(object):
        def __init__(self, start=0):
            self.countergen = itertools.count(start)
            self.lock = threading.Lock()
        def __call__(self):
            with self.lock:
                return self.countergen.__next__()
    
    #create a function that provides incremental sequence numbers
    _getnextseqnum = _ThreadSafeCounter()
    
    @dataclasses.dataclass(order=True)
    class PriorityQueueItem:
        """Container for priority queue items
        
        The payload of the item is stored in the optional "data" (None by default), and
        can be of any type, even such that cannot be compared, e.g. dict.
        
        The queue priority is defined mainly by the optional "priority" argument (10 by
        default).
        If there are more items with the same "priority", their put-order is preserved,
        because of the automatically increasing sequence number, "_seqnum".
        Usage in the producer:
            pq.put(PriorityQueueItem("Best-effort-task",100))
            pq.put(PriorityQueueItem(dict(b=2))
            pq.put(PriorityQueueItem(priority=0))
            pq.put(PriorityQueueItem(dict(a=1))
        Consumer is to get the tasks with pq.get().getdata(), and will actually receive
            None
            {'b':2}
            {'a':1}
            "Best-effort-task"
        """
        data: typing.Any=dataclasses.field(default=None, compare=False)
        priority: int=10
        _seqnum: int=dataclasses.field(default_factory=_getnextseqnum, init=False)
        
        def getdata(self):
            """Get the payload of the item in the consumer thread"""
            return self.data
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多