【问题标题】:task queue in celery芹菜中的任务队列
【发布时间】:2018-03-09 13:10:11
【问题描述】:

我有一个处理数据的服务。它是用 Python (Django) 编写的,并使用 Celery 使其异步。

处理我们的数据会使用积分。您也可以购买积分,这是由 Stripe-webhook 触发的。

涉及信用更改的每个操作都被列为“工作”。我有 2 个 Celery 任务都将工作添加到某个 JobId 数据库。

我使用“作业”概念来跟踪哪些数据在哪个作业中处理。

models.py:

class JobId(models.Model):
    user = models.ForeignKey(User, blank=True, null=True, default=None)
    job_time = models.DateTimeField(auto_now_add=True)
    # current credit level
    credits = models.IntegerField(default=0, null=True, blank=True)
    # credit impact / delta of this job
    credit_delta = models.IntegerField(default=0, null=True, blank=True)

tasks.py:

task_1_buy_credits(user):
    # credit level of user is searched in jobs_database (the users last job)
    # adds one line in the JobId database with the users new credit balance


task_2_use_credits(user,data):
    # upfront unknown amount of data get processed
    # credit level of user is searched in jobs_database (the users last job)
    # decide to process job completely or stop if credit level is insufficient

我目前的问题是,当人们一次开始多个工作时,上一个工作还没有完成。由于我的最终信用余额尚不清楚,因此我将其设置为零以防止现在开始新的工作,而可能还有剩余的信用来完成这项工作。

当同时处理一项工作时信用等级增加时,也会发生类似的情况。

基本上,我需要一种解决方案,它只允许按照创建任务的相同顺序运行任务,并且在前一个任务完成后运行。

我需要一个实时的“用户相关信用等级检查”功能,该功能适用​​于尚未完成的正在运行的任务。

我无法在我的 Django 环境中同步运行,因为我的超时时间为 30 秒,因为这是托管在 heroku 上的 Web 应用程序。

【问题讨论】:

    标签: python django celery


    【解决方案1】:

    这是一个难题,因为 Celery 任务被预先设计为独立于其他所有内容。他们只关心你提供给他们的信息,而不关心正在处理的工作的顺序。您可以通过使用groups and chords 来解决此问题,但我看不出它们如何满足您的需求。

    在前面,我会添加一个 task_id CharFieldJobId 模型。当您开始一个任务时,您可以将返回的任务 ID 存储在那个JobId 的数据库中。因此,对于给定的用户 ID,您可以检查该用户的作业状态,如果仍有待处理的作业,则返回最新的信用状态。

    【讨论】: