【问题标题】:Why do the messages in RabbitMQ queue get lost when consumer is restart?为什么消费者重启时 RabbitMQ 队列中的消息会丢失?
【发布时间】:2019-09-22 07:04:33
【问题描述】:

我已经设置了一个 RabbitMQ 消费者如下:

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor

import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.INFO,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueueConsumer(object):
    """The consumer class to manage connections to the AMQP server/queue"""

    def __init__(self, queue, logger, parameters, thread_id=0):
        self.channel = None
        self.connection = None
        self.queue_name = queue
        self.logger = logger
        self.consumer_id = 'Thread: %d' % (thread_id,)
        self.parameters = pika.ConnectionParameters(**parameters)

    def _on_queue_declared(self, frame):
        self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
        self.channel.basic_qos(prefetch_count=1)
        try:
            self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
            self.logger.info("{} Declared queue...".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))

    def _on_channel_open(self, channel):
        self.channel = channel
        try:
            self.channel.queue_declare(queue=self.queue_name,
                                       exclusive=False,
                                       durable=True,
                                       auto_delete=False,
                                       callback=self._on_queue_declared)
            self.logger.info("{} Opened Channel....".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))

    def _on_connected(self, connection):
        connection.channel(self._on_channel_open)

    def consume(self):
        try:
            self.connection = SelectConnection(self.parameters,
                                               self._on_connected)
            self.connection.ioloop.start()
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))
            self.connection.close()
            self.connection.ioloop.start()

    def decode(self, body):
        try:
            _body = body.decode('utf-8')
        except AttributeError:
            _body = body

        return _body

    def handle_delivery(self, channel, method, header, body):
        try:
            start_time = datetime.datetime.now()
            _logger.info("Received...")
            _logger.info("Content: %s" % body)
            req = json.loads(self.decode(body))

            # Do something
            sleep(randint(10, 100))

            time_taken = datetime.datetime.now() - start_time
            _logger.info("[{}] Time Taken: {}.{}".format(
                req.get("to_num"), time_taken.seconds, time_taken.microseconds))

        except Exception as err:
            _logger.exception(err)


if __name__ == "__main__":
    workers = 3
    pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
    try:
        pool = ThreadPoolExecutor(max_workers=workers)
        start = 1
        for thread_id in range(start, (workers + start)):
            pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)

    except Exception as err:
        _logger.exception(err)

我也有一个队列发布者,如下:

import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.DEBUG,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueuePublisherClient(object):

    def __init__(self, queue, request):
        self.queue = queue
        self.response = None
        self.channel = None
        self.request = request
        self.corrId = str(uuid.uuid4())
        self.callBackQueue = None
        self.connection = None
        parameters = pika.ConnectionParameters(host="0.0.0.0")
        self.connection = SelectConnection(
            parameters, self.on_response_connected
        )
        self.connection.ioloop.start()

    def on_response(self, ch, method, props, body):
        if self.corrId == props.correlation_id:
            self.response = body
            self.connection.close()
            self.connection.ioloop.start()

    def on_response_connected(self, connection):
        _logger.info("Connected...\t(%s)" % self.queue)
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_connected(self, connection):
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        # _logger.info("Channel Opened...\t(%s)" % self.queue)
        self.channel = channel
        self.channel.queue_declare(queue=self.queue,
                                   durable=True,
                                   exclusive=False,
                                   auto_delete=False,
                                   callback=self.on_queue_declared)

    def on_queue_declared(self, frame):
        self.channel.basic_publish(exchange="",
                                   routing_key=self.queue,
                                   properties=pika.BasicProperties(),
                                   body=str(self.request))
        self.connection.close()
        _logger.info("Message Published...\t(%s)" % self.queue)


if __name__ == "__main__":
    data = {
        'text': 'This is a sample text',
        'to_num': '+2547xxxxxxxx'
    }
    count = 10000

    for index in range(count):
        data['index'] = index
        QueuePublisherClient("test_queue", json.dumps(data))

当我向队列发布 10000 条消息并且未启动消费者时,通过 rabbitmqctl list_queues 我可以看到 test_queue 有 10000 条消息。当我启动消费者时,我运行rabbitmqctl list_queues,我看到队列有 0 条消息。但是,消费者仍在使用队列中的消息。问题是,当我在几秒钟后停止消费者然后重新启动它时,我无法恢复我的消息。我该如何避免呢?

这只是一个实际情况的模拟,消费者进程被monit重新启动,我遭受消息丢失。

【问题讨论】:

    标签: python rabbitmq pika


    【解决方案1】:

    由于您已经将 prefetch_count 声明为 1 并且您的队列是持久的,因此当消费者启动时,它将仅一一处理消息。要检查相同的内容,您可以在代码中设置 1 秒的睡眠时间,并在几秒钟后尝试重新启动使用者。您会看到,已处理的消息仅从队列中删除。如果您没有设置预取计数,那么只有一个消费者开始从队列中清除所有消息。希望对您有所帮助。

    【讨论】:

    • 我已经尝试将其设置为 0。就我的担忧而言没有任何区别。另外,我检查了文档和The prefetch-count is ignored by consumers who have enabled the no-ack option
    【解决方案2】:

    首先,您应该使用最新版本的 Pika。

    当您设置no_ack=True(Pika 1.0 中为auto_ack=True)时,RabbitMQ 认为消息在传递时已确认。这意味着当您停止时,您的消费者在内存中(或 TCP 堆栈中)的每条消息都将丢失,因为 RabbitMQ 认为它已被确认。

    您应该使用no_ack=False(默认)并在您的工作完成后确认handle_delivery 中的消息。请注意,如果您的工作需要很长时间,您应该在另一个线程中进行,以防止阻塞 Pika 的 I/O 循环。

    请参阅以下文档:https://www.rabbitmq.com/confirms.html


    注意:RabbitMQ 团队会监控 rabbitmq-users mailing list,并且有时只回答 StackOverflow 上的问题。

    【讨论】:

    • 旧版本是因为遗留代码。升级正在进行中。感谢您提供邮件列表链接。根据Please note that if your work takes a long time ,多长时间是多久?
    • 另外,我认为消费应该是 1×1 的,尤其是 prefetch_count 为 1 时。它是一次传送所有 10000 条消息吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-08
    • 1970-01-01
    • 2019-07-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多