【问题标题】:Implement mechanism of sending messages every second实现每秒发送消息的机制
【发布时间】:2021-06-18 14:47:08
【问题描述】:

我需要每秒向用户发送来自 rabbitMQ 的消息。出于这些目的,我使用了 2 个线程。第一个从 rabbitMQ 获取消息并将它们排队。第二个线程从队列中获取消息,处理它们,然后通过 Web 套接字将它们发送给用户。我的问题是如何最好地实施这种机制。 现在我的代码如下所示:

def __init__(self):
    self.data = {}
    self.queue = Queue()

def download_data(self):
    started_time = time.perf_counter()
    while True:
        if time.perf_counter() - started_time >= 1:
            started_time = time.perf_counter()
            for _ in range(self.queue.qsize()):
                self.append_data(self.queue.get())
            self.sync_send_data_to_user(self.data)
            self.data = {}


def message_handle(self, ch, method, properties, body):
    message = json.loads(body)
    self.queue.put(message)
    ch.basic_ack(delivery_tag=method.delivery_tag)

def start_consuming(self):
    connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
    channel = connection.channel()
    rabbit_queue = channel.queue_declare("to_client")
    channel.basic_consume(on_message_callback=self.message_handle, queue="to_client")
    threading.Thread(target=channel.start_consuming, args=[]).start()

此代码有效,但有时每秒发送消息超过 1 次。第一个线程是rabbitMQ消费者-回调函数message_handle,第二个线程是无限循环函数download_datamessage_handle 检查是否超过 1 秒,如果发生这种情况,message_handle 使用队列阻塞线程,直到检索到队列中的所有项目。

更新: 我想我让自己变得更难了,我最终改变了一点逻辑。现在我有2个线程,其中一个处理来自rabbitmq的消息并将它们发送到队列,函数message_handle。 python中的队列是线程安全的,所以它应该可以工作。第二个线程检查是否已经过了 1 秒,如果是,则从队列中检索所有数据并将其发送给用户,函数 download_data

【问题讨论】:

    标签: python multithreading rabbitmq queue thread-safety


    【解决方案1】:
    1. 您正在将 AsyncIO(协程)与线程混合。两者是完全不同的并发方法,会产生不同的行为。更合理的方法是仅使用两者之一来开发您的解决方案。我会选择基于 asyncio 的解决方案,并确保我在可能的情况下 await 并且不会阻塞事件循环。

    2. Python 存在GIL 问题。因此,即使您认为您正在同时做一些事情,实际上您在任何给定时间都绑定到单个线程执行。如果我添加上一节(创建多个线程),您可能会遇到意外的时序问题。 Python 确实不是最适合完美时序的(它也深受本地执行环境/负载的影响)

    3. 如果您关心确保真正的并发执行,您可以查看python multiprocessing。同样,结果可能会有所不同。

    【讨论】: