【问题标题】:getting celery group results获得芹菜组结果
【发布时间】:2016-06-03 20:48:06
【问题描述】:

所以这就是我想要做的。我有一个计划任务,每 X 分钟运行一次。在任务中,我创建了一组任务,我希望它们彼此并行运行。在他们全部完成后,我想记录该组是否成功完成。这是我的代码:

@shared_task(base=HandlersImplTenantTask, acks_late=True)
def my_scheduled_task():
    try:
        needed_ids = MyModel.objects.filter(some_field=False)\
                                .filter(some_other_field=True)\
                                .values_list("id", flat=True) \
                                .order_by("id")
        if needed_ids:
            tasks = [my_single_task.s(needed_id=id) for id in needed_ids]
            job = group(tasks)
            result = job.apply_async()
            returned_values = result.get()
            if result.ready():
                if result.successful():
                    logger.info("SUCCESSFULLY FINISHED ALL THE SUBTASKS")
                else:
                    returned_values = result.get()
                    logger.info("UNSUCCESSFULLY FINISHED ALL THE SUBTASKS WITH THE RESULTS %s" % returned_values)
        else:
            logger.info("no needed ids found")
    except:
        logger.exception("got an unexpected exception while running task")

这是 my_single_task 代码:

@shared_task(base=HandlersImplTenantTask)
def my_single_task(needed_id):

    logger.info("starting task for tenant: [%s]. got id [%s]", connection.tenant, needed_id)
    return

这就是我运行芹菜的方式: manage.py celery worker -c 2 --broker=[my rabbitmq broker url]

当我到达result.get() 行时,它挂起。我看到具有第一个 ID 的单个任务的单个日志条目,但我看不到其他任务。当我杀死我的 celery 进程并重新启动它时 - 它重新运行计划任务,我看到第二个日志条目具有第二个 id(从任务第一次运行开始)。有关如何解决此问题的任何想法?

编辑 - 为了尝试克服这个问题 - 我创建了一个名为“new_queue”的不同队列。我启动了一个不同的芹菜工人来听新的队列。我想让其他工人接受任务并处理它们。我认为这可以解决死锁的问题。 我已将代码更改为如下所示:

job = group(tasks)
job_result = job.apply_async(queue='new_queue')
results = job_result.get()

但我仍然遇到死锁,如果我删除 results = job_result.get() 行,我可以看到主要工作人员正在处理这些任务,并且没有任何内容发布到 new_queue 队列。有什么想法吗? 这是我的芹菜配置: tenant_celery_app.conf.update(CELERY_RESULT_BACKEND='djcelery.backends.database.DatabaseBackend' CELERY_RESULT_DB_TABLENAMES = { 'task': 'tenantapp_taskmeta', 'group': 'tenantapp_groupmeta', }
这就是我运行工人的方式:
celery worker -c 1 -Q new_queue --broker=[amqp_brocker_url]/[vhost]
celery worker -c 1 --broker=[amqp_brocker_url]/[vhost]

【问题讨论】:

    标签: python-2.7 rabbitmq celery celery-task


    【解决方案1】:

    您的队列似乎陷入僵局。想想看。如果您有一个等待其他任务的任务,并且队列已满,那么第一个任务将永远挂起。

    您需要重构代码以避免在任务中调用result.get()(您的日志中可能已经有关于此的警告)

    我会推荐这个:

    @shared_task(base=HandlersImplTenantTask, acks_late=True)
    def my_scheduled_task():
    
        needed_ids = MyModel.objects.filter(some_field=False)\
                                .filter(some_other_field=True)\
                                .values_list("id", flat=True) \
                                .order_by("id")
        if needed_ids:
            tasks = [my_single_task.s(needed_id=id) for id in needed_ids]
            job = group(tasks)
            result = job.apply_async()
    

    这就是你所需要的。

    使用日志记录来跟踪任务是否失败。

    如果您应用程序中其他地方的代码需要跟踪作业是否失败,那么您可以使用celery's inspect api。

    【讨论】:

    • 感谢您的回复。我明白我做错了什么 - 谢谢。在您的代码示例中,您有result = job.apply_async()。如果任务将异步运行,结果变量的目的是什么?在任务运行或完成之前,它可以为我提供哪些信息?
    【解决方案2】:

    所以我一直在寻找的解决方案确实是创建一个新队列并启动一个处理新队列的新工作人员。我唯一遇到的问题是将组任务发送到新队列。这是对我有用的代码。

    tasks = [my_single_task.s(needed_id=id).set(queue='new_queue') for id in needed_ids]
    job = group(tasks)
    job_result = job.apply_async()
    results = job_result.get() # this will block until the tasks finish but it wont deadlock
    

    这些是我的芹菜工人

    celery worker -c 1 -Q new_queue --broker=[amqp_brocker_url]/[vhost]
    celery worker -c 1 --broker=[amqp_brocker_url]/[vhost]
    

    【讨论】:

    • 在等待子任务在不同队列上执行的组结果时,您是否遇到过任何问题?就我而言,我可以观察到子任务成功执行,但组由于超时而失败(我已设置超时)。我看到你在这里有相同的用例,但我很好奇我错过了什么。
    猜你喜欢
    • 2015-05-16
    • 1970-01-01
    • 1970-01-01
    • 2014-06-22
    • 2018-12-06
    • 2021-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多