【问题标题】:Celery configuration in Django, connecting tasks to the viewDjango 中的 Celery 配置,将任务连接到视图
【发布时间】:2018-12-20 12:10:26
【问题描述】:

我最近将 celery 配置为运行一些虚拟任务,并通过我的 Mac 上的终端运行工作人员。这一切似乎都相应地运行,花了一段时间,因为那里的一些文献似乎建议不同的配置方案,但我还是到了那里。现在下一步是通过我在 Django 中的视图触发任务。我正在使用 celery 1.2.26.post2

我的项目结构:

/MyApp
   celery_tasks.py
   celeryconfig.py
   __init__.py

我一直在关注几个教程,发现这个 video 和这个 video 和这个 video 非常有助于获得芹菜的整体视图。

我的脚本是:

celery_tasks.py

from celery import Celery
from celery.task import task

app = Celery()                          # Initialise the app
app.config_from_object('celeryconfig')  # Tell Celery instance to use celeryconfig module


suf = lambda n: "%d%s" % (n, {1: "st", 2: "nd", 3: "rd"}.get(n if n < 20 else n % 10, "th"))
@task
def fav_doctor():
    """Reads doctor.txt file and prints out fav doctor, then adds a new
    number to the file"""

    with open('doctor.txt', 'r+') as f:
        for line in f:
            nums = line.rstrip().split()

            print ('The {} doctor is my favorite'.format(suf(int(nums[0]))))

            for num in nums[1:]:
                print ('Wait! The {} doctor is my favorite'.format(suf(int(num))))

            last_num = int(nums[-1])
            new_last_num = last_num + 1

            f.write(str(new_last_num) + ' ')


@task
def reverse(string):
    return string[::-1]

@task
def add(x, y):
    return x+y

celeryconfig.py

from datetime import timedelta

## List of modules to import when celery starts.
CELERY_IMPORTS = ('celery_tasks',)

## Message Broker (RabbitMQ) settings.
BROKER_URL = 'amqp://'
BROKER_PORT = 5672
#BROKER_TRANSPORT = 'sqlalchemy'
#BROKER_HOST = 'sqlite:///tasks.db'
#BROKER_VHOST = '/'
#BROKER_USER = 'guest'
#BROKER_PASSWORD = 'guest'

## Result store settings.
CELERY_RESULT_BACKEND = 'rpc://'
#CELERY_RESULT_DBURI = 'sqlite:///mydatabase.db'

## Worker settings
#CELERYD_CONCURRENCY = 1
#CELERYD_TASK_TIME_LIMIT = 20
#CELERYD_LOG_FILE = 'celeryd.log'
#CELERYD_LOG_LEVEL = 'INFO'

## Misc
CELERY_IGNORE_RESULT = False
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT=['json']
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_ENABLE_UTC = True

CELERYBEAT_SCHEDULE = {
    'doctor-every-10-seconds': {
        'task': 'celery_tasks.fav_doctor',
        'schedule': timedelta(seconds=3),
    },
}

__init__.py

from .celery_tasks import app as celery_app # Ensures app is always imported when Django starts so that shared_task will use this app.

__all__ = ['celery_app']

settings.py

INSTALLED_APPS = [
    ...
     'djcelery',
]

在我的视图文件夹中,我有一个特定的视图模块,admin_scripts.py

from MyApp.celery_tasks import fav_doctor, reverse, send_email, add

@login_required
def admin_script_dashboard(request):
    if request.method == 'POST':
        form = Admin_Script(request.POST)
        if form.is_valid():
            backup_script_select = form.cleaned_data['backup_script_select']
            dummy_script_select = form.cleaned_data['dummy_script_select']
            print ("backup_script_select: {0}".format(backup_script_select))
            print ("dummy_script_select: {0}".format(dummy_script_select))
            if backup_script_select:
                print ("Backup script exectuting. Please wait...")
                dbackup_script_dir =  str(Path.home()) + '/Software/MyOtherApp/cli-tools/dbbackup_DRAFT.py'
                subprocess.call(" python {} ".format(dbackup_script_dir), shell=True)
                async_result = reverse.delay('Using Celery')
                print ("async_result: {0}".format(async_result))
                result = reverse.AsyncResult(async_result.id)
                print ("result: {0}".format(result))
                print ("Something occured...")
            if dummy_script_select:
                print ("Dummy script exectuting. Please wait...")
                dummy_script_dir =  str(Path.home()) + '/Software/MyOtherApp/cli-tools/dummy.py'
                subprocess.call(" python {} ".format(dummy_script_dir), shell=True)
                async_result = add.delay(2, 5)
                print ("async_result: {0}".format(async_result))
                result = add.AsyncResult(async_result.id)
                print ("result: {0}".format(result))
                print ("Something occured...")
            return render(request, 'MyApp/admin_scripts_db.html')

问题出现在我的 admin_scripts.py 文件中的行中,其中调用了 async_result = add.delay(2, 5)。在回溯下方:

[12/Jul/2018 09:23:19] ERROR [django.request:135] Internal Server Error: /MyProject/adminscripts/
Traceback (most recent call last):
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/local.py", line 309, in _get_current_object
    return object.__getattribute__(self, '__thing')
AttributeError: 'PromiseProxy' object has no attribute '__thing'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/kombu/utils/__init__.py", line 323, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'conf'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/loaders/base.py", line 158, in _smart_import
    return imp(path)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/loaders/base.py", line 112, in import_from_cwd
    package=package,
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/utils/imports.py", line 101, in import_from_cwd
    return imp(module, package=package)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/loaders/base.py", line 106, in import_module
    return importlib.import_module(module, package=package)
  File "/Users/MyMBP/anaconda3/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 978, in _gcd_import
  File "<frozen importlib._bootstrap>", line 961, in _find_and_load
  File "<frozen importlib._bootstrap>", line 948, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'celeryconfig'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/django/core/handlers/exception.py", line 41, in inner
    response = get_response(request)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/django/core/handlers/base.py", line 187, in _get_response
    response = self.process_exception_by_middleware(e, request)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/django/core/handlers/base.py", line 185, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/django/contrib/auth/decorators.py", line 23, in _wrapped_view
    return view_func(request, *args, **kwargs)
  File "/Users/MyMBP/Software/MyProject/MyProjectsite/MyProject/views/admin_scripts.py", line 44, in admin_script_dashboard
    async_result = add.delay(2, 5)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/local.py", line 143, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/local.py", line 311, in _get_current_object
    return self.__evaluate__()
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/local.py", line 341, in __evaluate__
    thing = Proxy._get_current_object(self)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/local.py", line 101, in _get_current_object
    return loc(*self.__args, **self.__kwargs)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 270, in _task_from_fun
    '__wrapped__': fun}, **options))()
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/app/task.py", line 201, in __new__
    instance.bind(app)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/app/task.py", line 365, in bind
    conf = app.conf
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/kombu/utils/__init__.py", line 325, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 638, in conf
    return self._get_config()
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 454, in _get_config
    self.loader.config_from_object(self._config_source)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/loaders/base.py", line 140, in config_from_object
    obj = self._smart_import(obj, imp=self.import_from_cwd)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/loaders/base.py", line 161, in _smart_import
    return symbol_by_name(path, imp=imp)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/kombu/utils/__init__.py", line 96, in symbol_by_name
    module = imp(module_name, package=package, **kwargs)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/loaders/base.py", line 112, in import_from_cwd
    package=package,
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/utils/imports.py", line 101, in import_from_cwd
    return imp(module, package=package)
  File "/Users/MyMBP/anaconda3/lib/python3.6/site-packages/celery/loaders/base.py", line 106, in import_module
    return importlib.import_module(module, package=package)
  File "/Users/MyMBP/anaconda3/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 978, in _gcd_import
  File "<frozen importlib._bootstrap>", line 961, in _find_and_load
  File "<frozen importlib._bootstrap>", line 948, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'celeryconfig'

抛出了许多错误,并且回溯非常大,总共大约 9000 行。这只是一个sn-p。一般来说,我是 celery 和任务队列的新手,所以也许对于那里的一些专家,你可以从我的代码中找出一些非常明显的错误。

如我所说,celery 的配置是成功的,并且在终端触发任务时,任务会做他们应该做的事情。我正在逐步构建它,因此下一步是使用我在 Django 中的视图触发任务(而不是使用终端调用)。一旦我弄清楚了,那么最终的目标是跟踪任务的进度,并在一个单独的窗口(.js、AJAX 等)中向用户报告输出,该窗口显示例如您在其中看到的行输出控制台。

我读到 tasks.py(在我的例子中是 celery_tasks.py)文件需要位于在 settings.py 中注册的 django 应用程序中。这是真的吗?

【问题讨论】:

    标签: python-3.x celery django-celery celery-task django-1.11


    【解决方案1】:

    这不是一个完整的答案,但可能会部分帮助遇到类似问题的其他人:

    celery_tasks.py中基本有以下内容:

    app.config_from_object('celeryconfig') 
    

    当我通过终端触发工作人员时,这是可行的。当我通过我的视图执行此操作时,可以看到上面的错误消息。通过视图更改此行:

    app.config_from_object('MyApp.celeryconfig') 
    

    我仍然需要弄清楚为什么会出现这种差异以及如何解决这个问题,这样无论是通过我的视图还是终端调用任务都无关紧要。

    【讨论】:

      最近更新 更多