【问题标题】:is Celery Task initialized per each worker process, or once per app?芹菜任务是每个工作进程初始化一次,还是每个应用程序初始化一次?
【发布时间】:2017-11-15 00:32:43
【问题描述】:

我有一个繁重的外部库类,它需要时间来初始化并消耗大量内存。我想至少为每个任务实例创建一次。

class NlpTask(Task):
    def __init__(self):
        print('initializing NLP parser')
        self._parser = nlplib.Parser()
        print('done initializing NLP parser')

    @property
    def parser(self):
        return self._parser

@celery.task(base=NlpTask)
def my_task(arg):
    x = my_task.parser.process(arg)
    # etc.

Celery 启动了 32 个工作进程,所以我希望打印 "initializing ... done" 32 次,因为我假设每个工作进程都创建一个任务实例。令人惊讶的是,我得到了打印一次。那里实际发生了什么?谢谢。

【问题讨论】:

    标签: python python-3.x multiprocessing celery celery-task


    【解决方案1】:

    您的NlpTask 在向工作人员注册时正在初始化一次。

    如果你有两个类似的任务

    @celery.task(base=NlpTask)
    def foo(arg):
        pass
    
    
    @celery.task(base=NlpTask)
    def bar(arg):
        pass
    

    然后当你启动一个worker时,你会看到2个初始化。

    如果你想为每个worker初始化一次,你可以使用worker_process_init信号。

    from celery.signals import worker_process_init
    
    
    @worker_process_init.connect()
    def setup(**kwargs):
        print('initializing NLP parser')
        # setup
        print('done initializing NLP parser')
    

    现在,当你启动一个 worker 时,你会看到 setup 被每个进程调用一次。

    【讨论】:

    • 这就是我的观点 - 我希望每个工人一次,而且似乎每个芹菜实例一次。我编辑了问题
    • @davka 更新了答案。
    【解决方案2】:

    为此:

    这就是我的观点 - 我希望每个工人一次,而且似乎每个 celery 实例一次。我编辑了问题——@davka

    答案必须是在connect 中使用发件人过滤器,例如:

    @worker_process_init.connect(sender='xx')
    def func(sender, **kwargs):
        if sender == 'xx':
            # dosomething
    

    但我发现它在 celery 4.0.2 中不起作用。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2010-10-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-04-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多