【问题标题】:Sequential task execution in CeleryCelery 中的顺序任务执行
【发布时间】:2016-08-10 08:01:49
【问题描述】:

我对要使用 Celery 执行的数据库有一些“繁重”的请求。考虑到它们很“重”,我想按顺序(一个接一个)执行它们。一种可能的解决方案是在 Celery 的命令行中指定 --concurrency=1。它有效。但是有一个问题:在执行任务时,以下所有请求都返回None

from celery.task.control import inspect

# Inspect all nodes.
i = inspect()

print(i.scheduled()) # None
print(i.active()) # None
print(i.reserved()) # None
print(i.registered()) # None

另外,运行celery inspect ping 会返回Error: No nodes replied within time constraint.,这样我就无法收到有关 Celery 队列状态的任何信息。

有我的测试python模块:

celeryconfig.py

#BROKER_URL = 'redis://localhost:6379/0'
BROKER_URL = 'amqp://'
#CELERY_RESULT_BACKEND = "redis"
CELERY_RESULT_BACKEND = "amqp://"
# for php
CELERY_TASK_RESULT_EXPIRES = None
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACKS_LATE = True

tasks.py

from celery import Celery
from time import sleep

app = Celery('hello')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    sleep(30)       
    return x + y

client.py

from tasks import add

result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)
result=add.delay(4, 4)

那么,问题来了,如何一个一个地运行任务并且能够检查队列的状态?

【问题讨论】:

    标签: python celery


    【解决方案1】:

    Celery 的检查是通过向任何监听的对象广播查询然后收集响应来执行的。任何在超时(我认为默认为 1 秒)内没有响应的工作人员都将被忽略。就好像它不存在一样。

    您使用--concurrency=1 应该不是问题。我刚刚尝试过,它在这里工作得很好。即使并发为 1,Celery worker 通常也会有一个额外的执行线程来进行通信。 (我说“通常”是因为我确信有办法将 Celery 配置为在脚上射击。我所说的默认值是成立的。)当我尝试 --concurrency=1 时,实际上每个工人有两个线程。所以即使worker忙于计算任务,也应该有一个线程能够响应广播。

    话虽如此,如果机器负载很重,那么工人可能需要很长时间才能做出响应。我解决此问题的方法是重试i.scheduled() 之类的电话,直到我得到每个人的答复。在我的项目中,我知道有多少工人应该启动并运行,所以我有一个清单可以用来了解每个人是否都做出了回应。

    【讨论】:

    • 感谢您的回答。我使用默认值(全新安装)和我在问题中给出的所有额外内容。也许我在 Windows 下这样做是有道理的。我将在 Linux 上检查并返回这里。
    • 我在Linux上测试过,问题不存在。因此,这可能是某些 Windows 特定问题。无论如何都支持您的问题,以获得详细的解释和测试。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-16
    • 1970-01-01
    相关资源
    最近更新 更多