【问题标题】:Why does Celery have different behaviors in two ways of starting the worker?为什么 Celery 在启动 worker 的两种方式上有不同的行为?
【发布时间】:2019-09-23 16:52:03
【问题描述】:

我在包app中添加了一个芹菜工人:

proj
├── app
│   ├── worker.py
│   └── server.py
└── db
    ├── db.sql
    ├── check_db_health.py
    └── documents.md

我可以通过两种方式运行 worker:

#1st way
user@host:~/proj
$ celery worker -A app.worker

#2nd way
user@host:~/proj
$ python -m app.worker

但是它们之间有不同的行为。

  • 第二个没有用于工作人员的可自定义参数,所以我不想使用它。但它运行完美!没有发现错误。
  • 第一个启动也不错,但由于项目的真实代码具有动态模块导入链,因此在触发任务时通常会遇到模块导入错误。

server.py

from app.worker import enqueue

def sum(a, b):
  return a+b

enqueue(sum, a, b)

worker.py

def enqueue(callback, *args, **kwargs):
    module_path = inspect.getfile(callback)
    module_name = inspect.getmodule(callback).__name__
    func_name   = callback.__name__
    no_delay = kwargs.pop('no_delay', False)
    return do_legacy_task.apply_async(args=args, kwargs=dict(kwargs, **{
        "module_name": module_name,
        "module_path": module_path,
        "func_name": func_name
    }))
    return None


@task(bind=True, name="app.worker.do_callback_task")
do_legacy_task(self, *args, **kwargs):
    clean_kwargs = copy.deepcopy(kwargs)
    module_path  = clean_kwargs.pop('module_path')
    module_name  = clean_kwargs.pop('module_name')
    func_name    = clean_kwargs.pop('func_name')

    spec = importlib.util.spec_from_file_location(module_name, module_path)
    if spec is None:
        print("can't find the module %s in file %s" % (module_name, module_path,))
    else:
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        sys.modules[module_name] = module

        callback = getattr(module, func_name)
        return callback(*args, **clean_kwargs)
    return None

server & worker 动态导入技术基于example

错误

File "/Users/johndoe/proj/app/server.py", line 16, in <module>
    from db.check_db_health import CodeDiagnostic
ModuleNotFoundError: No module named 'db'

为什么两种方式在导入其他模块时会有不同的行为? 如何解决上述错误?

更新 我发现我可以通过为 Celery 传递一个额外的参数来跳过这个错误:

user@host:~/proj
$ celery worker -A app.worker --include db.check_db_health

如果有办法自动完成就好了。

【问题讨论】:

    标签: python celery python-module python-importlib


    【解决方案1】:

    我不是很了解 Celery 的导入模块机制。但是当你想在一个项目中让一个worker与不同的包交互时;你应该将这些包添加到celeryconfig.py:

    celeryconfig.py

    include = [
        'db. check_db_health', # a module
        'tests'                # or a package with a ready __init__.py 
    ]
    

    Celery 使用上面的配置将列出的模块加载到进程中。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-06-08
      • 2019-10-28
      • 2016-05-13
      • 2013-08-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-12-13
      相关资源
      最近更新 更多