【发布时间】:2022-01-05 07:24:43
【问题描述】:
我已经使用 RabbitMQ 作为代理实现了 Celery。我依赖 Celery v4.4.7,因为我读过 v5.0+ 不再支持 RabbitMQ。在我的情况下,RabbitMQ 是必须的。
所有内容都已容器化,然后在 Kubernetes 1.19 中部署为 pod。我能够执行长时间运行的任务,乍一看一切看起来都很好。不过,我几乎没有需要您的专业知识的顾虑。
- 我已经声明了入站和出站队列,但 Celery 创建了他自己的队列,我在这些队列(入站或出站)中看不到任何消息:
inbound_queue = "_IN"
outbound_queue = "_OUT"
app = Celery()
app.conf.update(
broker_url = 'pyamqp://%s//' % path,
broker_heartbeat = None,
broker_connection_timeout = int(timeout)
result_backend = 'rpc://',
result_persistent = True,
task_queues = (
Queue(algorithm_queue, Exchange(inbound_queue), routing_key='default', auto_delete=False),
Queue(result_queue, Exchange(outbound_queue), routing_key='default', auto_delete=False),
),
task_default_queue = inbound_queue,
task_default_exchange = inbound_exchange,
task_default_exchange_type = 'direct',
task_default_routing_key = 'default',
)
@app.task(bind=True,
name='osmq.tasks.add',
queue=inbound_queue,
reply_to = outbound_queue,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 5, 'countdown': 2})
def execute(self, data):
<method_implementation>
- 我已实现回调以通过 REST API 获取结果。但是,当状态成功时,它可以随机返回或不返回一些结果。这可能与消息持久性有关。详细地说,当我实现花 API 来获取信息时,状态是成功的并且结果被部分显示(缩短的 json 消息) - 当我调用 AsyncResult 时,对于相同的状态,结果要么是 None 要么是正确的。我不明白 rabbitmq 队列和 kombu 之间的机制,它似乎缓存了结果消息。我必须保证每次成功执行任务时都能检索结果。
def callback(uuid):
task = app.AsyncResult(uuid)
【问题讨论】:
-
“我已经读到 v5.0+ 不再支持 RabbitMQ” - 你可以添加参考吗?我正在使用 RabbitMQ + Celery 5.0.5,所以我不确定它是从哪里来的
-
如果我可以升级到 v5.0+,那就太酷了。这里,一个来源:docs.celeryproject.org/en/stable/…
-
他们在谈论结果后端,而您在谈论经纪人。我使用 RabbitMQ 作为代理 + Redis 作为结果后端。
-
我的错误,我想使用rabbitmq作为代理和结果后端;我不能吗?
-
我猜你不能再用它作为你的结果后端了(我没试过)。很久以前,我将后端更改为 Redis。我正在使用和弦(画布)并且 RabbitMQ 行为在这种情况下很糟糕,每秒发送一条消息,它出现在我的花中