【问题标题】:Why is RabbitMQ not persisting messages on a durable queue?为什么 RabbitMQ 不在持久队列上持久化消息?
【发布时间】:2011-06-30 03:17:02
【问题描述】:

我通过 Celery 将 RabbitMQ 与 Django 一起使用。我使用的是最基本的设置:

# RabbitMQ connection settings
BROKER_HOST = 'localhost'
BROKER_PORT = '5672'
BROKER_USER = 'guest'
BROKER_PASSWORD = 'guest'
BROKER_VHOST = '/'

我导入了一个 Celery 任务并将其排入队列以在一年后运行。从 iPython 外壳:

In [1]: from apps.test_app.tasks import add

In [2]: dt=datetime.datetime(2012, 2, 18, 10, 00)

In [3]: add.apply_async((10, 6), eta=dt)
DEBUG:amqplib:Start from server, version: 8.0, properties: {u'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': 'RabbitMQ', u'version': '2.2.0', u'copyright': 'Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.', u'platform': 'Erlang/OTP'}, mechanisms: ['PLAIN', 'AMQPLAIN'], locales: ['en_US']
DEBUG:amqplib:Open OK! known_hosts []
DEBUG:amqplib:using channel_id: 1
DEBUG:amqplib:Channel open
DEBUG:amqplib:Closed channel #1
Out[3]: <AsyncResult: cfc507a1-175f-438e-acea-8c989a120ab3>

RabbitMQ 在 celery 队列中收到这条消息:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
KTMacBook.local.celeryd.pidbox  0   false
celery  1   true
celeryctl_KTMacBook.local   0   true
...done.

然后我通过按 control-C 然后按“a”中止来杀死 RabbitMQ。当我再次启动服务器并使用rabbitmqctl检查时,它说芹菜队列中没有消息:

$  rabbitmqctl list_queues name messages durable
Listing queues ...
celery  0   true
celeryctl_KTMacBook.local   0   true
...done.

芹菜队列是持久的。为什么消息没有持久化?我需要做什么才能使消息持久化?

【问题讨论】:

    标签: python django rabbitmq celery


    【解决方案1】:

    使队列持久化与使队列上的消息持久化不同。持久队列意味着当服务器重新启动时它们会自动再次出现 - 这显然发生在您的情况下。但这不会影响消息本身。

    要使消息持久化,您还必须将消息的 delivery_mode 属性标记为 2。有关完整说明,请参阅经典文章 Rabbits and Warrens

    编辑:完整链接已损坏,但截至 2013 年 12 月,您仍然可以从主 URL 找到博客文章:http://blogs.digitar.com/jjww/

    【讨论】:

    • 交付模式似乎已经设置为2:add.delivery_mode == 2。据我所知,芹菜无法更改此默认值。
    • 有没有办法可以检查邮件以检查其传递模式?
    • 您使用的是什么版本的 Kombu? (由 Celery 使用)Kombu 1.0.0 有一个错误,即未正确设置消息传递模式。
    • 我正在使用 kombu-1.0.2、celery-2.2.2 和 django_celery-2.2.2。
    • 您要做的另一件事是使用交易/确认。这样你就可以确定 Rabbit 不仅收到了消息,而且还发送到了磁盘。
    【解决方案2】:

    要找出消息delivery_mode,您可以使用它并查看消息属性:

    >>> from tasks import add
    >>> add.delay(2, 2)
    
    >>> from celery import current_app
    >>> conn = current_app.broker_connection()
    >>> consumer = current_app.amqp.get_task_consumer(conn)
    
    >>> messages = []
    >>> def callback(body, message):
    ...     messages.append(message)
    >>> consumer.register_callback(callback)
    >>> consumer.consume()
    
    >>> conn.drain_events(timeout=1)
    
    >>> messages[0].properties
    >>> messages[0].properties
    {'application_headers': {}, 'delivery_mode': 2, 'content_encoding': u'binary',    'content_type': u'application/x-python-serialize'}
    

    【讨论】:

    • 我确认交付模式设置为 2。我能够通过将 RabbitMQ 升级到 2.3.1 来使其工作。我在使用 RabbitMQ 2.2.0 时遇到了持久性问题。
    猜你喜欢
    • 2023-04-05
    • 2015-12-19
    • 1970-01-01
    • 2016-01-04
    • 2011-10-12
    • 2012-12-26
    • 2015-08-03
    • 1970-01-01
    • 2019-11-04
    相关资源
    最近更新 更多