【问题标题】:Task queues and result queues with Celery and RabbitmqCelery 和 Rabbitmq 的任务队列和结果队列
【发布时间】:2022-01-05 07:24:43
【问题描述】:

我已经使用 RabbitMQ 作为代理实现了 Celery。我依赖 Celery v4.4.7,因为我读过 v5.0+ 不再支持 RabbitMQ。在我的情况下,RabbitMQ 是必须的。

所有内容都已容器化,然后在 Kubernetes 1.19 中部署为 pod。我能够执行长时间运行的任务,乍一看一切看起来都很好。不过,我几乎没有需要您的专业知识的顾虑。

  1. 我已经声明了入站和出站队列,但 Celery 创建了他自己的队列,我在这些队列(入站或出站)中看不到任何消息:
inbound_queue =  "_IN"
outbound_queue = "_OUT"

app = Celery()

app.conf.update(
    broker_url = 'pyamqp://%s//' % path,
    broker_heartbeat = None,
    broker_connection_timeout = int(timeout)
    result_backend = 'rpc://',
    result_persistent = True,
    task_queues = (
        Queue(algorithm_queue, Exchange(inbound_queue), routing_key='default', auto_delete=False),
        Queue(result_queue, Exchange(outbound_queue), routing_key='default', auto_delete=False),
    ),
    task_default_queue = inbound_queue, 
    task_default_exchange = inbound_exchange,
    task_default_exchange_type = 'direct',
    task_default_routing_key = 'default',
)

@app.task(bind=True, 
          name='osmq.tasks.add', 
          queue=inbound_queue,
          reply_to = outbound_queue, 
          autoretry_for=(Exception,), 
          retry_kwargs={'max_retries': 5, 'countdown': 2})
def execute(self, data):
  
  <method_implementation>

  1. 我已实现回调以通过 REST API 获取结果。但是,当状态成功时,它可以随机返回或不返回一些结果。这可能与消息持久性有关。详细地说,当我实现花 API 来获取信息时,状态是成功的并且结果被部分显示(缩短的 json 消息) - 当我调用 AsyncResult 时,对于相同的状态,结果要么是 None 要么是正确的。我不明白 rabbitmq 队列和 kombu 之间的机制,它似乎缓存了结果消息。我必须保证每次成功执行任务时都能检索结果。
def callback(uuid):
    task = app.AsyncResult(uuid)


【问题讨论】:

  • “我已经读到 v5.0+ 不再支持 RabbitMQ” - 你可以添加参考吗?我正在使用 RabbitMQ + Celery 5.0.5,所以我不确定它是从哪里来的
  • 如果我可以升级到 v5.0+,那就太酷了。这里,一个来源:docs.celeryproject.org/en/stable/…
  • 他们在谈论结果后端,而您在谈论经纪人。我使用 RabbitMQ 作为代理 + Redis 作为结果后端。
  • 我的错误,我想使用rabbitmq作为代理和结果后端;我不能吗?
  • 我猜你不能再用它作为你的结果后端了(我没试过)。很久以前,我将后端更改为 Redis。我正在使用和弦(画布)并且 RabbitMQ 行为在这种情况下很糟糕,每秒发送一条消息,它出现在我的花中

标签: rabbitmq celery


【解决方案1】:

具体来说,Celery 5.0+ 不再支持 amqp:// 作为后端了。但是,作为您的示例,支持 rpc://。

相关的sn-p在这里:https://docs.celeryproject.org/en/stable/getting-started/backends-and-brokers/index.html#rabbitmq

在我们的实现中,我们往往总是忽略_results=True,所以我不能给出任何关于如何使用 rpc:// 的实用技巧,除了推断任何响应都放在特定于应用程序的队列上,而不是能够通过 amqp:// 放入指定的队列(甚至是不同的代理/rabbitmq 实例)。

【讨论】:

    最近更新 更多