【问题标题】:Celery warning "Received and deleted unknown message"芹菜警告“收到并删除未知消息”
【发布时间】:2019-08-23 14:14:55
【问题描述】:

我正在 Celery 中设置一项任务,以使其从某个主题交换中“消耗”。当我将消息发送到有问题的交易所时,我收到错误消息:“收到并删除了一条未知消息。错误的目的地?!?”在 celery 控制台上。

我已经创建了一个单独的项目文件夹来复制所有内容都称为 test-something 的问题,其结构如下:

celery-test/  
  L celery.py  
  L celeryconfig.py  
  L tasks.py

我已经看到了各种 StackOverflow 问题和 GitHub 问题,涉及到 librabbitmq 包。这里的解决方案是卸载这个包,但我什至没有安装它,所以这让我无处可去。发现的一些问题/问题表明了此解决方案:
- https://github.com/celery/celery/issues/3675
- Celery &Rabbitmq:WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?- a experiment on the GIT

我也尝试过使用任务路由设置,因为我认为问题在于 atm,但我无法让它工作。

对于任何想知道为什么端口关闭 1 的人,那是因为它指向我的 docker 容器中的 rabbitmq,它不能再使用 5672。

芹菜.py

app = Celery('celery_test', include=['celery_test.tasks'])
app.config_from_object('celery_test.celeryconfig')

celeryconfig.py

broker_url = 'amqp://guest:guest@localhost:5673//'
result_backend = 'rpc://'

default_exchange = Exchange('default', type='direct')
test_exchange = Exchange('test_exchange', type='topic')
task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('test_queue', test_exchange, routing_key='test123test')
)

task_routes = {
    'celery_test.tasks.test_method': {
        'queue': 'test_queue'
    }
}

tasks.py

@app.task
def test_method():
    print('test_method')
    return 'test_method'

然后是我用来发送消息的文件:send.py

connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5673/'))
channel = connection.channel()

exchange = 'test_exchange'
routing_key = 'test123test'
message = 'Testmessage'

channel.exchange_declare(exchange=exchange, exchange_type='topic', durable=True)

channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)
connection.close()

【问题讨论】:

    标签: python rabbitmq celery message-queue messagebroker


    【解决方案1】:

    这可能不是真正的答案,而是更多的后续行动。但我想我会让人们知道谁遇到了这个问题。 (这篇文章是我的全部解释,由于我是芹菜新手,你可能应该对它持保留态度。)

    所以基本上我认为发生这种情况的原因是因为 Celery 不理解这个信息。 Celery 需要大量的标头和其他属性才能理解消息试图做什么。

    可以通过对它们进行逆向工程来模拟这些标头,但我不打算这样做,因为有更简单的方法可以解决我计划制作的应用程序。

    如果有人在阅读本文时对此主题有更多经验,请随时纠正我。

    【讨论】:

      【解决方案2】:

      这适用于 2021 年受此影响的任何人。我有一个使用 celery 3.1.x 的旧服务(我们称之为“旧版”)和一个最近创建的使用 celery 5.0.x 版本的服务(我们称之为“现代”) )。

      在现代代码库中,Celery.signature() 方法用于创建签名,然后在 legacy 中调用 apply_async() 来调用任务。旧版 celery 确实收到了我的消息,但由于此错误而将其丢弃:

      收到并删除了未知消息。目的地错误?!?

      我通过在celeryconfig.py 文件中添加这一行解决了这个问题:

      task_protocol = 1
      

      解释在这里:https://github.com/celery/celery/issues/3675#issuecomment-294129297

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2021-01-16
        • 2010-12-31
        • 1970-01-01
        • 2011-08-08
        • 2013-10-06
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多