【问题标题】:Celery / Django Single Tasks being run multiple timesCelery / Django Single Tasks 多次运行
【发布时间】:2014-07-27 22:44:35
【问题描述】:

我遇到了一个问题,我将一个任务放入队列并且它正在运行多次。 从芹菜日志中我可以看到同一个工人正在运行任务......

[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner..
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete
[2014-06-06 15:13:32,836: INFO/Worker-2] update created
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent
[2014-06-06 15:13:33,656: INFO/Worker-2] update created
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success

但是,当我查看应用程序的实际日志时,它会为每个步骤显示 5-6 个日志行 (??)。

我正在使用带有 RabbitMQ 的 Django 1.6。放入队列的方法是在函数上放置延迟。

这个函数(添加了任务装饰器(然后调用一个正在运行的类。

有人知道解决此问题的最佳方法吗?

编辑:根据要求,代码如下,

views.py

在我看来,我通过...将我的数据发送到队列

from input.tasks import add_queue_project

add_queue_project.delay(data)

tasks.py

from celery.decorators import task

@task()
def add_queue_project(data):
    """ run project """
    logger = logging_setup(app="project")

    logger.info("starting project runner..")
    f = project_runner(data)
    f.main()

class project_runner():
    """ main project runner """

    def __init__(self,data):
        self.data = data
        self.logger = logging_setup(app="project")

    def self.main(self):
        .... Code

settings.py

THIRD_PARTY_APPS = (
    'south',  # Database migration helpers:
    'crispy_forms',  # Form layouts
    'rest_framework',
    'djcelery',
)

import djcelery
djcelery.setup_loader()

BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672 # default RabbitMQ listening port
BROKER_USER = "test"
BROKER_PASSWORD = "test"
BROKER_VHOST = "test"
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ
CELERY_RESULT_DBURI = ""

CELERY_IMPORTS = ("input.tasks", )

芹菜

我正在运行的线路是启动芹菜,

python2.7 manage.py celeryd -l info

谢谢,

【问题讨论】:

  • 这些不是 celerybeat 创建的工作,对吗?
  • 您是否在某些 django 信号处理程序中创建任务?如果是,请确保该信号没有被多次调用。
  • 请发布您的代码
  • 我已按要求添加了代码。
  • project_runner.main() 在哪里?

标签: python django


【解决方案1】:

我没有给你一个确切的答案,但有一些事情你应该研究一下:

  • djcelery 已弃用,因此如果您使用的是新版本的celery,可能会出现某种冲突。

  • 如果您的input 应用程序列在INSTALLED_APPS 中,celery 会发现它,因此您无需将其添加到CELERY_IMPORTS = ("input.tasks", ),这可能是您的问题的原因,因为可以加载任务多次

  • 尝试为您的任务命名@task(name='input.tasks.add'),无论您如何导入它,它都会知道这是同一个任务。

查看您的设置,您似乎正在使用旧版本的 celery,或者您正在使用旧配置来获取新版本的 celery。无论如何,请确保您拥有最新版本并尝试使用此配置而不是您拥有的配置:

BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

现在,您还必须对 celery 进行不同的配置:

彻底摆脱djcelery 的东西。

在您的 django 项目中创建 proj/celery.py

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery('proj')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

在你的proj/__init__.py:

from __future__ import absolute_import

from proj.celery import app as celery_app

如果您的 input 应用是可重用的应用并且不属于您的项目,请使用 @shared_task 而不是 @task 装饰器。

然后运行 ​​celery:

celery -A proj worker -l info

希望对你有帮助。

【讨论】:

    猜你喜欢
    • 2023-03-24
    • 2013-04-07
    • 2023-03-10
    • 1970-01-01
    • 1970-01-01
    • 2017-09-26
    • 2017-09-11
    • 2020-07-25
    • 2021-09-28
    相关资源
    最近更新 更多