【问题标题】:Celery and RabbitMQ I am not receiving updates for my taskCelery 和 RabbitMQ 我没有收到任务更新
【发布时间】:2020-07-06 14:42:22
【问题描述】:

你好吗?我正在开发 django 2.2、celery 4.4.2 和 在我的 tasks.py 文件中,我有以下代码

@shared_task
def start_task(dict):
    objectLogic = LogicProcess()
    # print('dict', dict)
    task = objectLogic.RunProcess(parameters=dict)
    print("Estado de la tarea: {}\r".format(task))
    return task

它调用了一个函数,最后我更新它但我没有收到响应。

current_task.update_state(
                state = 'PROGRESS',
                meta = {
                    'current': oer_number,
                    'total': TotalOER,
                    'percent': int((oer_number * 100) / TotalOER)
                }
            )

啊是的...在任务结束时我让它返回

return {'current': TotalOER, 'total': TotalOER, 'percent': 100}

如果我得到一个(第一次更新)我看不到其他人

celery -A ost worker -l info -n worker
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status     {}
task 528454cb-724e-48bc-b95e-9d3a124b22e1
Task status     {'state': 'PROGRESS', 'result': {'current': 2, 'total': 66, 'percent': 3}}

task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 2
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 74
task_id=528454cb-724e-48bc-b95e-9d3a124b22e1 HTTP/1.1" 200 110

或者只是它调用的那个如果它有效,它只是不返回我的状态

tasks.pyfunction.py 两个文件中导入

from celery import shared_task, current_task

celery.py 文件包含

# from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'ost.settings')

app = Celery('ost', backend='amqp')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

在 django 看来我这样称呼它

from celery.result import AsyncResult

task = start_task.AsyncResult(task_id)
            data = {}
            if isinstance(task.result, dict):
                data['state'] = task.state
                data['result'] = task.result

我用if task.ready() 尝试过,但它不满足条件,尽管我已经提到过它是否结束函数并返回

无论如何,我不知道还有什么要说明的,任何想法都会被测试,有些东西我会审查link 1link 2link 2

PS:我在 for 循环中更新任务状态

【问题讨论】:

    标签: django rabbitmq celery


    【解决方案1】:

    正如我所提到的,更新是在一个 FOR 周期内,为此我创建了一个 Recursive functions,其中找到了 AsyncResult,在我的例子中是 django 视图。这将检索任务的状态。当 oer_number = TotalOER 时,我会结束它 (task.state = SUCCESS)。

    看到透视是合乎逻辑的,它在FOR 循环中更新,它在recursive functions mmm 中恢复人们忘记的事情。所以它只恢复单个值

    【讨论】:

      猜你喜欢
      • 2017-10-11
      • 2019-08-15
      • 2011-07-18
      • 2011-07-17
      • 2021-12-12
      • 1970-01-01
      • 1970-01-01
      • 2018-09-20
      • 2014-01-18
      相关资源
      最近更新 更多