【问题标题】:How to process with thread Queue in python如何在python中使用线程队列进行处理
【发布时间】:2020-04-17 07:35:30
【问题描述】:
            from queue import Queue
            import threading
            import time

            queue1 = Queue()


            # this function should finish its work and restart with fresh queue1
            def association_helper():
                # this program should get the whole data from the queue, add into the list and print it. again it starts with
                # remaining items in queue (the items which was inserting when this function printings the value)
                lock = threading.Lock()
                lock.acquire()
                items = []
                print("Start..")
                while True:
                    if queue1.qsize()>0:
                        print("Line no 13:", queue1.qsize())
                        SizeofQueue1 = queue1.qsize()
                        for i in range(SizeofQueue1):
                            items.append(queue1.get())
                        queue1.task_done()
                        print("Line no 19:", len(items))
                        print(items)
                        print("Line no 25: done")
                        time.sleep(0.1)
                lock.release()


            i = 0


            def main():
                global i
                # continuous data coming and adding in queue
                while True:
                    queue1.put([i])
                    i += 1


            if __name__ == '__main__':
                # main thread will always run (adding the items in the queue)
                f_thread = threading.Thread(target=association_helper)
                f_thread.daemon = True
                f_thread.start()
                main()


    output:

    Start... 
    Line no 13: 1415 
    Line no 19: 3794 
    Line no 25: done
    Line no 13: 40591 
    Line no 19: 41856 
    Line no 25: done 
    Line no 13: 78526


as per expectations, the line no 13 and line no 19 should be same. also, after, line no 25, it should print Start..(because association_helps should finish its execution and run again)

为什么association_helper 函数会运行一次?为什么它没有完成工作并在队列中使用新的剩余项目重新启动?

动机:

  1. queue1 将始终在主线程中添加新项目。
  2. 当 sizeof(queue1)>0 时,association_helper 应该从 queue1 中提取全部数据并处理数据。
  3. 但是应该继续在队列中添加项目
  4. association_helper 完成执行后,它应该从队列中的新项目重新开始。

【问题讨论】:

    标签: python-3.x multithreading multiprocessing python-multiprocessing python-multithreading


    【解决方案1】:

    让我们从头开始:

    按照预期,第 13 行和第 19 行应该是一样的。

    由于您 get 在一个线程中从队列中插入 (put) 到另一个线程而不使用任何 Lock不应该期望在两行之间(在线程函数中)不会将任何内容添加到队列中。 这就是您所看到的,在第 13 行打印大小并在第 14 行获取大小,从而产生不同的值。

    另外,在第 25 行之后,它应该打印 Start..(因为 association_helps 应该完成它的执行并再次运行)

    您在进入while True 循环之前print("Start..")。除非您再次调用此函数,否则您将不会再看到该打印。


    以下是如何解决put/get线程队列中的races的解释和示例:

    将锁声明为全局变量。

    lock1 = threading.Lock()
    

    现在使用这个锁让我们确保队列的大小和预期的 len(items) 将产生相同的值。

    with lock1:
        print("Line no 13:", queue1.qsize())
        SizeofQueue1 = queue1.qsize()
    # and
    with lock1:
        queue1.put([i])
    

    这将导致 - 与预期大小相同。:

    Line no 13: 9
    Line no 19: 9
    [[1], [2], [3], [4], [5], [6], [7], [8], [9]]
    Line no 25: done
    

    关于print("Start.."),您可以将其插入到while循环中,以便在迭代之间打印。

    while True:
            print("Start..")
            if queue1.qsize()>0:
                # The rest of the code
    

    最后,如果您希望items 列表仅包含当前迭代中的项目,您需要clear 它。 如果您不清除两次迭代之间的list,那么差异只会越来越大。

    list.clear() 从列表中删除所有项目。相当于del a[:]。

    你会得到:

    while True:
            print("Start..")
            items.clear()
            if queue1.qsize()>0:
                # The rest of the code
    

    【讨论】:

    • 嗨@Amiram:谢谢,这很有帮助。我有两个问题要问: 1. 相同的“lock1=Threading.Lock()”如何同时在两个线程中表现? 2. 如果我们不是故意给任何时间间隔来锁定/解锁,如何/何时在一个线程中释放锁并在另一个线程中释放?谢谢。
    • 好问题,在 cmets 窗口中没有空间详细说明,但简而言之: 1. 由于这是 一个 进程,因此两个线程都可以访问进程内存,所以他们都可以使用它,这就是为什么首先创建Lock 以在不同线程之间同步。 2.当一个线程是acquireLock时,第二个线程无法获得被该锁“保护”的代码,直到第一个线程release它。在puti+=1 之间的示例中,线程1 没有Lock,它使线程2 有机会获得它。
    • 在实时应用中,一个锁在两个不同的线程中是很困难的,有没有办法在没有锁/一个锁的情况下获得相同的输出?有时一个线程没有释放锁,我们无法看到预期的输出。我们可以使用某种计时器和事件概念,我们可以期待相同的输出。?谢谢。
    • 参考上面的示例,您只需在当前循环迭代中保留并使用它即可获得队列的大小。在这种情况下,您将不需要锁、事件或计时器。例如:current_q_size = queue1.qsize() 并在您希望在循环当前迭代中使用大小的任何其他地方使用它,
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-10-23
    • 2018-06-27
    • 1970-01-01
    • 1970-01-01
    • 2015-09-19
    相关资源
    最近更新 更多