【发布时间】:2016-06-03 20:48:06
【问题描述】:
所以这就是我想要做的。我有一个计划任务,每 X 分钟运行一次。在任务中,我创建了一组任务,我希望它们彼此并行运行。在他们全部完成后,我想记录该组是否成功完成。这是我的代码:
@shared_task(base=HandlersImplTenantTask, acks_late=True)
def my_scheduled_task():
try:
needed_ids = MyModel.objects.filter(some_field=False)\
.filter(some_other_field=True)\
.values_list("id", flat=True) \
.order_by("id")
if needed_ids:
tasks = [my_single_task.s(needed_id=id) for id in needed_ids]
job = group(tasks)
result = job.apply_async()
returned_values = result.get()
if result.ready():
if result.successful():
logger.info("SUCCESSFULLY FINISHED ALL THE SUBTASKS")
else:
returned_values = result.get()
logger.info("UNSUCCESSFULLY FINISHED ALL THE SUBTASKS WITH THE RESULTS %s" % returned_values)
else:
logger.info("no needed ids found")
except:
logger.exception("got an unexpected exception while running task")
这是 my_single_task 代码:
@shared_task(base=HandlersImplTenantTask)
def my_single_task(needed_id):
logger.info("starting task for tenant: [%s]. got id [%s]", connection.tenant, needed_id)
return
这就是我运行芹菜的方式: manage.py celery worker -c 2 --broker=[my rabbitmq broker url]
当我到达result.get() 行时,它挂起。我看到具有第一个 ID 的单个任务的单个日志条目,但我看不到其他任务。当我杀死我的 celery 进程并重新启动它时 - 它重新运行计划任务,我看到第二个日志条目具有第二个 id(从任务第一次运行开始)。有关如何解决此问题的任何想法?
编辑 - 为了尝试克服这个问题 - 我创建了一个名为“new_queue”的不同队列。我启动了一个不同的芹菜工人来听新的队列。我想让其他工人接受任务并处理它们。我认为这可以解决死锁的问题。 我已将代码更改为如下所示:
job = group(tasks)
job_result = job.apply_async(queue='new_queue')
results = job_result.get()
但我仍然遇到死锁,如果我删除 results = job_result.get() 行,我可以看到主要工作人员正在处理这些任务,并且没有任何内容发布到 new_queue 队列。有什么想法吗?
这是我的芹菜配置:
tenant_celery_app.conf.update(CELERY_RESULT_BACKEND='djcelery.backends.database.DatabaseBackend'
CELERY_RESULT_DB_TABLENAMES = {
'task': 'tenantapp_taskmeta',
'group': 'tenantapp_groupmeta',
}
这就是我运行工人的方式:
celery worker -c 1 -Q new_queue --broker=[amqp_brocker_url]/[vhost]
celery worker -c 1 --broker=[amqp_brocker_url]/[vhost]
【问题讨论】:
标签: python-2.7 rabbitmq celery celery-task