【问题标题】:how to get the queue in which a task was run - celery如何获取运行任务的队列 - 芹菜
【发布时间】:2014-03-13 16:42:58
【问题描述】:

我是新来的芹菜,有一个问题。我有这个简单的任务:

@app.task(name='test_install_queue')
def test_install_queue():
    return subprocess.call("exit 0",shell=True)

我稍后会在类似的测试用例中调用此任务

result = tasks.test_default_queue.apply_async(queue="install")

任务在队列 @9​​87654325@ 中成功运行(因为我在 celery 日志中看到它,并且它完成得很好。但我想知道一种以编程方式查找任务 test_install_queue 运行在哪个队列中的方法,来自result中存储的对象。

谢谢!

编辑:

我已将任务更改为:

@app.task(name='test_install_queue',bind=True)
def test_install_queue(self):
    return self.request.__dict__

然后我使用apply_async 的结果如下:

result = tasks.test_install_queue.apply_async(queue="install")
assert "install" in result.get()["hostname"]

解决方法是工作人员(主机名)与工作人员中初始化的唯一队列具有相同的名称。

【问题讨论】:

  • 谢谢。但我不需要任务 ID,我需要任务运行所在的队列。我通读了 Celery 文档,但没有找到任何东西。
  • 查看该部分中提到的链接-> celery.readthedocs.org/en/latest/userguide/…
  • 仅供参考,如果您不为任务指定名称,它将与函数名称相同,因此在您的示例中,任务名称是不必要的。此外,您可以为每个任务指定队列,而不是在它始终在同一个队列中时调用它。所以它会是这样的:@app.task(queue='install')
  • 谢谢你们。问题不在于队列或路由,我想尝试从 celery worker 获取队列名称(我正在静态设置队列,以确保我将“以编程方式”获取的队列是正确的。

标签: python queue task celery


【解决方案1】:

您可以尝试以下方法:

delivery_info = app.current_task.request.delivery_info
# by default celery uses the same name for queues and exchanges
original_queue = delivery_info['exchange']
for queue in app.amqp.queues.itervalues():
    if queue.exchange.name == delivery_info['exchange'] 
        and queue.routing_key == delivery_info['routing_key']:
            original_queue = queue.name
            break

这种方法是建立在假设您使用默认 celery 设置并且您的交换是直接的。如果您需要更通用的扇出和主题交换解决方案,则必须检查 app.amqp.queues 中每个已声明队列的路由键。

【讨论】:

  • 我最后能够解决问题,但您的回答仍然有效。谢谢!
【解决方案2】:

我自己刚刚遇到了这个问题,我真的很怀疑是否需要一个复杂的解决方案,比如来自“lexabug”的一个已经被接受的解决方案...... 因此,由于即使 Celery 文档也没有提供有效的替代方案,我自己使用反射进行了调查,以了解哪个对象包含我需要的信息,我想出了一个超级简单直接的解决方案。具体来说,我正在用 Celery 术语编写一个钩子,或者更好的是一个信号,这就是我根据任务名称检索队列名称的方式:

    @signals.after_task_publish.connect()
    def on_task_publish(sender=None, headers=None, body=None, **kwargs):

        # "sender" is a string containing task name 
        # ("celery" here is the celery app)
        task: Task = celery.tasks.get(sender)

        # once we have the task object, we can access the "queue" property 
        # which contains the name of the queue 
        # (it' a dynamic property so don't expect support by your IDE)
        queue_name: str = task.queue if task is not None else 'unknown'

ps。我正在使用 Celery 4.4

【讨论】:

  • “我真的很怀疑是否需要一个复杂的解决方案”我也很怀疑这种需要,但老实说,即使在 celery 和 kombu 代码库中挖掘了一段时间后,我也不清楚为什么@ 987654322@ 不存在或为什么挂钩中的queueexchange 等值为空。不过,谢谢。