【问题标题】:How can I configure celery task如何配置芹菜任务
【发布时间】:2014-10-27 15:34:53
【问题描述】:

我在一个项目中使用 Celery,我将它用作调度程序(作为定期任务)。

我的 Celery 任务如下所示:

@periodic_task(run_every=timedelta(seconds=300))
def update_all_feed():
    feed_1()
    feed_2()
    ...........
    feed_n()

但是随着提要数量的增加,到达其他提要需要很长时间(例如,当 Celery 使用提要编号 n 时,需要很长时间才能到达下一个提要 (n+1)。我想要使用 Celery 的并发来启动多个 feed。

浏览文档后,我发现我可以像下面这样调用 celery 任务:

feed.delay()

如何配置 celery 以便它获取所有提要 ID 并聚合它们(例如,一次 5 个提要)?我意识到要实现这一点,我必须将 Celery 作为守护进程运行。

注意:我使用 mongodb 作为代理,我所做的只是安装它并在 Celery 的配置中添加 url。

【问题讨论】:

    标签: python asynchronous celery distributed task-queue


    【解决方案1】:

    您可以像这样安排所有供稿

    @periodic_task(run_every=timedelta(seconds=300))
    def update_all_feed():
        feed_1.delay()
        feed_2.delay()
        .......
        feed_n.delay()
    

    或者你可以使用一个组来简化它

    from celery import group
    @periodic_task(run_every=timedelta(seconds=300))
    def update_all_feed():
        group(feed.delay(i) for i in range(10))
    

    现在要运行任务,你可以启动一个worker来执行任务

    celery worker -A your_app -l info --beat
    

    这开始每五分钟执行一次您的任务。但是,默认并发等于您的 cpu 的核心。您也可以更改并发性。如果你想同时执行 10 个任务,那么

    celery worker -A your_app -l info --beat -c 10
    

    【讨论】:

      【解决方案2】:

      来自Celery documentation

      from celery.task.sets import TaskSet
      from .tasks import feed, get_feed_ids
      
      job = TaskSet(tasks=[
              feed.subtask((feed_id,)) for feed_id in get_feed_ids()
          ])
      
      result = job.apply_async()
      results = result.join() # There's more in the documentation
      

      【讨论】: