【问题标题】:Celery+Docker+Django -- Getting tasks to workCelery+Docker+Django -- 让任务工作
【发布时间】:2015-09-20 20:15:00
【问题描述】:

过去一周我一直在尝试学习 Celery,并将其添加到我使用 Django 和 Docker-Compose 的项目中。我很难理解如何让它工作;我的问题是在使用任务时我似乎无法上传到我的数据库来工作。上传功能,insertIntoDatabase,在没有任何涉及 Celery 的情况下工作正常,但现在上传不起作用。事实上,当我尝试上传时,我的网站很快就告诉我上传成功,但实际上没有上传任何内容。

服务器以docker-compose up 启动,它将进行迁移、执行迁移、收集静态文件、更新需求,然后启动服务器。这一切都是使用pavement.py 完成的; Dockerfile 中的命令是CMD paver docker_run。在任何时候都没有明确启动 Celery 工人;我应该这样做吗?如果有,怎么做?

这是我在views.py中调用上传函数的方式:

insertIntoDatabase.delay(datapoints, user, description)

上传函数在一个名为databaseinserter.py 的文件中定义。以下装饰器用于insertIntoDatabase

@shared_task(bind=True, name="database_insert", base=DBTask)

这里是celery.pyDBTask类的定义:

class DBTask(Task):
     abstract = True

     def on_failure(self, exc, *args, **kwargs):
        raise exc

我不确定要为tasks.py 写什么。在我从他离开的地方继续之前,这是一位前同事留下的:

from celery.decorators import task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@task(name="database_insert")
def database_insert(data):

这是我用来配置 Celery 的设置 (settings.py):

BROKER_TRANSPORT = 'redis'
_REDIS_LOCATION = 'redis://{}:{}'.format(os.environ.get("REDIS_PORT_6379_TCP_ADDR"), os.environ.get("REDIS_PORT_6379_TCP_PORT"))
BROKER_URL = _REDIS_LOCATION + '/0'
CELERY_RESULT_BACKEND = _REDIS_LOCATION + '/1'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"

现在,我猜tasks.py 中的database_insert 不应该是空的,但是应该去那里呢?此外,tasks.py 中似乎没有发生任何事情——当我添加一些日志语句以查看 tasks.py 是否至少正在运行时,实际上没有任何内容最终被记录,这让我认为 tasks.py 是甚至没有被运行。如何正确地将我的上传功能变成一个任务?

【问题讨论】:

  • 传递给 delay() 的数据点和用户参数是什么?
  • datapoints 是字典对象的列表,user 只是一个字符串。数据点被转换为适合我们models.py 的 Django 对象。在 Celery 推出之前,上传一直运行良好。

标签: django celery docker-compose


【解决方案1】:

我认为,您离实现这一目标并不遥远。

首先,我建议您尝试将 Celery 任务和业务逻辑分开。因此,例如,在insertIntoDatabase 函数中将涉及将数据插入数据库的业务逻辑可能很有意义,然后单独创建一个 Celery 任务,可能名称为insert_into_db_task,它将您的参数作为普通 python 对象(重要)并使用这些参数调用上述insertIntoDatabase 函数以实际完成数据库插入。

该示例的代码可能如下所示:

my_app/tasks/insert_into_db.py

from celery.decorators import task
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@task()
def insert_into_db_task(datapoints, user, description):
    from my_app.services import insertIntoDatabase
    insertIntoDatabase(datapoints, user, description)

my_app/services/insertIntoDatabase.py

def insertIntoDatabase(datapoints, user, description):
    """Note that this function is not a task, by design"""

    # do db insertion stuff

my_app/views/insert_view.py

from my_app.tasks import insert_into_db_task

def simple_insert_view_func(request, args, kwargs):
    # start handling request, define datapoints, user, description

    # next line creates the **task** which will later do the db insertion
    insert_into_db_task.delay(datapoints, user, description)
    return Response(201)

我所暗示的应用程序结构正是我将如何做到的,并不是必需的。另请注意,您可能可以直接使用 @task() 而不为它定义任何参数。可能会为您简化事情。

这有帮助吗?我喜欢让我的任务轻松而蓬松。他们大多只是进行混蛋验证(例如,确保所涉及的 obj 存在于 DB 中)、调整任务失败时会发生什么(稍后重试?中止任务?等)、日志记录,否则他们执行其他地方的业务逻辑。

另外,如果不是很明显,您确实需要在某个地方运行 celery,以便有工作人员实际处理您的视图代码正在创建的任务。如果您不在某处运行 celery,那么您的任务只会堆积在队列中并且永远不会被处理(因此您的数据库插入永远不会发生)。

【讨论】:

  • 这似乎是一个很好的方法。我有几个问题: 1. 当您提到 my_app 时,您是在说与 my_project 不同,对吗?我的insertIntoDatabasemy_project/utils/db 中,而不是在my_app 中;这应该不是问题,对吧? 2. 你对如何在 docker-compose 的上下文中启动一个 celery worker 有什么想法吗?正如您可以想象的那样,正常的 CLI 方法是行不通的。 pavement.py 似乎也无法识别 celery。无论如何,感谢您帮助我更好地了解 celery!
  • 实际上,我想我找到了一种方法,通过使用启动 celery worker 的命令制作另一个容器。我遇到了这个问题,但这是一系列新问题。您的示例代码实际上帮助我解决了与此问题相关的问题,因此我将其标记为答案。
  • 1.应该没问题,尽管让您的应用程序依赖于您的项目会降低您的应用程序的独立性和可移植性。架构权衡。
  • 2. Bingo - 启动第二个容器来运行 celery,使用与 Web 容器相同的一般设置思路。如果您使用 django-celery,您可以使用与 Web 容器相同的 docker 映像并将命令更改为 manage.py celeryd,而不是使用 uwsgi、gunicorn、runserver 等。这就是我运行开发环境的方式:我使用1 个 docker 映像运行 3 个单独的容器。一个 Web 容器 manage.py runserver 0.0.0.0:8000、一个 celery 容器 manage.py celeryd 和一个 celery beat 容器 manage.py celerybeat
猜你喜欢
  • 2016-09-28
  • 2017-06-26
  • 2019-11-08
  • 1970-01-01
  • 2011-06-19
  • 2012-09-06
  • 2021-12-12
  • 2018-09-17
  • 1970-01-01
相关资源
最近更新 更多