【问题标题】:Python multithreading with shared variable具有共享变量的 Python 多线程
【发布时间】:2018-06-19 07:33:08
【问题描述】:

我正在尝试并行化我的工作,但我是多线程新手,所以对具体的实现感到困惑。

我有一个套接字侦听器,它将数据保存到缓冲区。当缓冲区达到他的容量时,我需要将其数据保存到数据库中。 在一个线程上我想启动套接字侦听器,而在并行任务上我想检查缓冲区状态。

BufferQueue 只是 python list 的扩展,具有允许检查列表是否达到指定大小的方法。

SocketManager 是我正在收听的STREAM_URL 的流数据提供者。它使用回调函数来处理消息

但是当我使用回调来检索数据时,我不确定使用共享变量是否是正确且最佳的决定

buffer = BufferQueue(buffer_size=10000)

def start_listening_to_sokcet(client):
    s = SocketManager(client)
    s.start_socket(cb_new)
    s.start()

def cb_new(message):
    print("New message")
    global buffer
    for m in message:
        #save data to buffer

def is_buffer_ready(buffer):
    global buffer
    print("Buffer state")
    if buffer.ready():
         #save buffer data to db

如果您能帮我解决这个问题,我将不胜感激

【问题讨论】:

  • 您可以使用该共享缓冲区,但您需要一些方法来控制对它的访问,以便一次只有一个线程可以修改它。例如,您可以使用Lock
  • 不知道这些BufferQueueSocketManager 等类型来自哪里,或者至少它们做了什么,很难提供任何不是很模糊的东西。但是我会警惕任何使用is_buffer_ready 函数的API,调用者必须定期检查(或者更糟糕的是,在自旋循环中);通常你会想要一些你可以阻止的东西。
  • 如果您可以给我们minimal reproducible example,我们可能会提出比“您可能需要在此处锁定”和“通常您希望通过某种方式阻止那里”更具体的想法……
  • @abarnert 感谢您的建议。我编辑了我的问题

标签: python multithreading sockets websocket multiprocessing


【解决方案1】:

我认为您正在寻找的只是queue 模块。

queue.Queue 是一个自同步队列,专为在线程之间传递对象而设计。

默认情况下,在队列上调用get 将阻塞,直到对象可用,这是您通常想要做的——在网络应用程序中使用线程进行并发的关键在于您的线程看起来都像正常同步代码,但是当他们无事可做时,他们大部分时间都在等待套接字、文件、队列或其他任何东西。但是您可以使用block=False 进行无阻塞检查,或者将timeout 置于等待状态。

您还可以在构造队列时指定maxsize。然后,默认情况下,put 将阻塞,直到队列不太满而无法接受新对象。但是,同样,您可以使用 blocktimeout 来尝试,如果它太满,则失败。

所有同步都在getput 内部处理,因此您不需要Lock 来保证线程安全,也不需要Condition 来向服务员发出信号。

队列甚至可以为您处理关机。生产者只需 put 一个特殊值,告诉消费者在 get 上看到它时退出。

对于生产者需要等待消费者完成的优雅关闭,您可以在消费者处理完每个排队对象后使用可选的task_done 方法,并让生产者阻塞join 方法。但是如果你不需要这个——或者有其他方式等待关闭,例如,加入消费者线程——你可以跳过这部分。

【讨论】:

    【解决方案2】:

    多线程为您提供资源(变量)的共享状态。无需使用全局变量,只需将缓冲区作为参数传递给您的方法,然后对其进行读/写。

    您仍然需要控制对缓冲区资源的访问,因此两个线程不会同时读取/写入。您可以使用 threading 模块中的 Lock 来实现:

    lock = threading.Lock()
    
    def cb_new(buffer_, lock_, message):
        print("New message")
        with lock_():
            for m in message:
                #save data to buffer
                buffer.add(m)
    
    def is_buffer_ready(buffer_, lock_):
        print("Buffer state")
        with lock_():
            if buffer_.ready():
                 #save buffer data to db
    

    请注意,如果您使用 multiprocessing 而不是线程,则此解决方案将不起作用。

    顺便说一句,正如@abarnert 评论的那样,有更好的机制来检查缓冲区是否准备好(有数据要读取/有空闲空间要写入)然后调用一个检查它的函数。查看select.select(),它会阻止您直到缓冲区真正准备好


    使用 select 时,将调用放入 while True 循环中,然后检查缓冲区是否已准备好读取。您可以在线程中启动此函数,传递标志变量和缓冲区。如果要停止线程,请将传递的标志更改为 False。对于缓冲区对象,使用Queue.Queue() 或类似的数据结构。

    def read_select(flag, buff):
        flag = 1
        while flag:
            r, _, _ = select.select([buff], [], [])
            if r:
                data = s.read(BUFFSIZE)
                # process data
    

    P.S - select 也适用于套接字。您可以传递一个套接字对象而不是缓冲区,它会检查套接字上的缓冲区是否已准备好进行读取。

    【讨论】:

    • 我猜你是对的。 select.select() 在这种情况下看起来非常合理。你能给我一个提示如何开始吗
    • 我添加了一个示例,说明您将如何使用它。
    • 我认为这不是一个好建议。你不能在队列中select,你也不需要;您只需阻止 get 呼叫。而且,虽然您可以在套接字上select,但同样,您不需要这样做;您可以阻止 recv 通话。 select 的要点是您可以同时阻塞多个套接字,而不需要每个套接字一个线程。它是用于网络并发的多线程的替代方案;您不需要同时使用两者。
    • 你肯定不想在回调 API 之上使用select。您可能想用它来实现那个回调 API,但同样,这是线程的替代方案(还有更好的替代方案,例如 asyncio,或第三方库,例如 Twisted)。而且,无论哪种方式,一旦你的回调被调用,数据要么准备好读取,要么已经被读取,所以你不想在任何事情上调用select
    猜你喜欢
    • 1970-01-01
    • 2017-09-10
    • 1970-01-01
    • 1970-01-01
    • 2018-06-23
    • 2013-05-23
    • 1970-01-01
    • 2018-08-28
    • 2023-04-05
    相关资源
    最近更新 更多