【问题标题】:Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task使用 add_periodic_task 在 Celery (celerybeat) 中动态设置周期性任务
【发布时间】:2017-04-28 17:17:02
【问题描述】:

我正在使用Celery 4.0.1Django 1.10,但我在安排任务时遇到了麻烦(运行任务工作正常)。这是芹菜的配置:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'),
    Queue('myapp.queue1'),
    Queue('myapp.queue2'),
    Queue('myapp.queue3'),
)

然后在 tasks.py 我有:

@app.task(queue='myapp.queue1')
def my_task(some_id):
    print("Doing something with", some_id)

在views.py中我想安排这个任务:

def my_view(request, id):
    app.add_periodic_task(10, my_task.s(id))

然后我执行命令:

sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app

但该任务从未被安排。我在日志中看不到任何内容。这项任务正在发挥作用,因为如果在我看来我这样做了:

def my_view(request, id):
    my_task.delay(id)

任务被执行。

如果在我的配置文件中,如果我手动安排任务,这样它就可以工作:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}

我只是无法动态安排任务。有什么想法吗?

【问题讨论】:

    标签: python django scheduled-tasks celery celerybeat


    【解决方案1】:

    编辑:(2018 年 13 月 1 日)

    最新的release 4.1.0 已经解决了这个ticket #3958 中的主题并已合并


    实际上不能在视图级别不定义周期性任务,因为节拍调度设置会先加载,在运行时无法重新调度:

    add_periodic_task()函数会在后台添加beat_schedule设置的入口,同样的设置也可以用来手动设置周期性任务:

    app.conf.CELERYBEAT_SCHEDULE = {
        'add-every-30-seconds': {
            'task': 'tasks.my_task',
            'schedule': 10.0,
            'args': (66,)
        },
    }
    

    这意味着如果你想使用add_periodic_task(),它应该被包装在 celery 应用级别的 on_after_configure 处理程序中,并且对运行时的任何修改都不会生效:

    app = Celery()
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        sender.add_periodic_task(10, my_task.s(66))
    

    正如doc 中提到的,常规的celerybeat 只是简单地跟踪任务执行:

    默认调度程序是celery.beat.PersistentScheduler,它只是在本地搁置数据库文件中跟踪上次运行时间。

    为了能够动态管理周期性任务并在运行时重新调度 celerybeat:

    还有django-celery-beat 扩展将日程存储在 Django 数据库中,并提供方便的管理界面在运行时管理周期性任务

    任务将被持久化在 django 数据库中,并且调度程序可以在 db 级别的任务模型中更新。每当您更新定期任务时,此任务表中的计数器将递增,并告诉 celery beat 服务从数据库重新加载计划。

    可能的解决方案如下:

    from django_celery_beat.models import PeriodicTask, IntervalSchedule
    
    schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
    task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))
    

    views.py

    def update_task_view(request, id)
        task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
        task.args=json.dumps([id])
        task.save()
    

    【讨论】:

    • 拿起你的评论“实际上你不能在视图级别定义周期性任务”:是否可以在应用级别使用add_periodic_task(),即intask.py?在应用程序中声明这些周期性任务似乎更好的封装。
    • 实际上根本不需要使用它,因为如果您只使用app.conf.CELERYBEAT_SCHEDULE设置语法,它将为您调用,但如果您想明确使用它,您可以在@987654339中使用它@文件。
    • 我相信最新版本(4.1.0 之后)应该解决这个问题。这是正在进行的开发#3958
    • 请注意,on_after_configure 不适用于在另一个应用程序的 tasks.py 文件中定义的定期任务。请改用on_after_finalize。这是因为一旦celery.py 文件被导入,on_after_configure 信号就会被发送,但是autodiscover_tasks 调用直到很久以后,当 django 完成设置所有应用程序时才解决。 (autodiscover_tasks 安排调用但不会立即执行它,除非您使用 force=True 保证在 Django 中失败)。