【发布时间】:2021-06-18 14:47:08
【问题描述】:
我需要每秒向用户发送来自 rabbitMQ 的消息。出于这些目的,我使用了 2 个线程。第一个从 rabbitMQ 获取消息并将它们排队。第二个线程从队列中获取消息,处理它们,然后通过 Web 套接字将它们发送给用户。我的问题是如何最好地实施这种机制。 现在我的代码如下所示:
def __init__(self):
self.data = {}
self.queue = Queue()
def download_data(self):
started_time = time.perf_counter()
while True:
if time.perf_counter() - started_time >= 1:
started_time = time.perf_counter()
for _ in range(self.queue.qsize()):
self.append_data(self.queue.get())
self.sync_send_data_to_user(self.data)
self.data = {}
def message_handle(self, ch, method, properties, body):
message = json.loads(body)
self.queue.put(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consuming(self):
connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
channel = connection.channel()
rabbit_queue = channel.queue_declare("to_client")
channel.basic_consume(on_message_callback=self.message_handle, queue="to_client")
threading.Thread(target=channel.start_consuming, args=[]).start()
此代码有效,但有时每秒发送消息超过 1 次。第一个线程是rabbitMQ消费者-回调函数message_handle,第二个线程是无限循环函数download_data。 message_handle 检查是否超过 1 秒,如果发生这种情况,message_handle 使用队列阻塞线程,直到检索到队列中的所有项目。
更新:
我想我让自己变得更难了,我最终改变了一点逻辑。现在我有2个线程,其中一个处理来自rabbitmq的消息并将它们发送到队列,函数message_handle。 python中的队列是线程安全的,所以它应该可以工作。第二个线程检查是否已经过了 1 秒,如果是,则从队列中检索所有数据并将其发送给用户,函数 download_data
【问题讨论】:
标签: python multithreading rabbitmq queue thread-safety