【问题标题】:Celery Beat: Limit to single task instance at a timeCelery Beat:一次限制为单个任务实例
【发布时间】:2014-01-03 01:03:52
【问题描述】:

我有 celery beat 和 celery(四名工人)批量做一些加工步骤。其中一项任务大致是这样的,“对于每个尚未创建 Y 的 X,创建一个 Y。”

任务以半快速(10 秒)的速度定期运行。任务完成得非常快。还有其他任务正在进行中。

我多次遇到过节拍任务明显积压的问题,因此同时执行相同的任务(来自不同节拍时间),导致错误地重复工作。似乎这些任务也是乱序执行的。

  1. 是否可以限制 celery beat 以确保一次只有一个未完成的任务实例?在任务上设置rate_limit=5 之类的“正确”方式是这样做的吗?

  2. 是否可以确保节拍任务按顺序执行,例如beat 不是分派任务,而是将其添加到任务链中?

  3. 除了让这些任务本身以原子方式执行并且可以安全地并发执行之外,最好的处理方法是什么?这不是我期望完成任务的限制……

任务本身的定义很天真:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return

这是一个实际的(清理过的)日志:

  • [00:00.000] foocorp.tasks.add_y_to_xs 已发送。 id->#1
  • [00:00.001]收到任务:foocorp.tasks.add_y_to_xs[#1]
  • [00:10.009] foocorp.tasks.add_y_to_xs 已发送。 id->#2
  • [00:20.024] foocorp.tasks.add_y_to_xs 已发送。 id->#3
  • [00:26.747]收到任务:foocorp.tasks.add_y_to_xs[#2]
  • [00:26.748]TaskPool:应用#2
  • [00:26.752]收到任务:foocorp.tasks.add_y_to_xs[#3]
  • [00:26.769] 任务接受:foocorp.tasks.add_y_to_xs[#2] pid:26528
  • [00:26.775]Task foocorp.tasks.add_y_to_xs[#2] 0.0197986490093s 成功:无
  • [00:26.806]TaskPool:应用#1​​li>
  • [00:26.836]TaskPool:应用#3
  • [01:30.020] 接受任务:foocorp.tasks.add_y_to_xs[#1] pid:26526
  • [01:30.053] 任务接受:foocorp.tasks.add_y_to_xs[#3] pid:26529
  • [01:30.055] foocorp.tasks.add_y_to_xs[#1]:为 X id 添加 Y #9725
  • [01:30.070] foocorp.tasks.add_y_to_xs[#3]:为 X id 添加 Y #9725
  • [01:30.074]Task foocorp.tasks.add_y_to_xs[#1] 0.0594762689434s 成功:无
  • [01:30.087] 任务 foocorp.tasks.add_y_to_xs[#3] 在 0.0352867960464s 内成功:无

我们目前使用 Celery 3.1.4 和 RabbitMQ 作为传输。

编辑丹,这是我想出的:

丹,这是我最终使用的:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield

还有 celery 任务装饰器(仅用于周期性任务):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task

【问题讨论】:

  • 谢谢,这是一个非常彻底的带有咨询锁的解决方案,而且比我的@contextmanager 干净得多。我对没有解锁感到困惑,直到我意识到您正在使用事务级锁定......这只是为了方便还是有其他理由选择它?
  • 我将SET statement_timeout TO :timeout 更改为SET LOCAL lock_timeout TO :timeout (postgresql.org/docs/9.3/static/runtime-config-client.html)。这样您就不会影响会话中任何长时间运行的非锁定语句。 (可能不是问题,因为你说你的跑得很快!)
  • @DanLenski 这似乎是一个很好的改变,谢谢。我们的大多数辅助函数都包装在子事务中,因此我使用事务级锁定来确保当同一会话中有其他工作时,全局锁定不会不必要地保持太久。
  • 但我刚刚意识到,通过创建新连接 (conn.begin()),没有理由使用事务锁而不是会话锁。同样,据我了解,在这种情况下也没有理由区分 lock_timeoutstatement_timeout
  • @erydo 只是我的一周变得更好了......)

标签: python concurrency rabbitmq celery celerybeat


【解决方案1】:
from functools import wraps
from celery import shared_task


def skip_if_running(f):
    task_name = f'{f.__module__}.{f.__name__}'

    @wraps(f)
    def wrapped(self, *args, **kwargs):
        workers = self.app.control.inspect().active()

        for worker, tasks in workers.items():
            for task in tasks:
                if (task_name == task['name'] and
                        tuple(args) == tuple(task['args']) and
                        kwargs == task['kwargs'] and
                        self.request.id != task['id']):
                    print(f'task {task_name} ({args}, {kwargs}) is running on {worker}, skipping')

                    return None

        return f(self, *args, **kwargs)

    return wrapped


@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
    pass


test_single_task.delay()

【讨论】:

  • 究竟什么是“clr_app”,我该如何导入它?哇。我是个假人。这只是@app.task...
  • 对于带有shared_task装饰器的Django用户:使用上面的代码和@shared_task(bind=true)和带有self作为第一个参数def your_task(self, *args, **kwargs)的函数
【解决方案2】:

这样做的唯一方法是implementing a locking strategy yourself

阅读here部分以供参考。

与 cron 一样,如果第一个任务没有,任务可能会重叠 在下一个之前完成。如果这是一个问题,你应该使用 锁定策略以确保一次只能运行一个实例(请参阅 例如确保一个任务一次只执行一个)。

【讨论】:

  • 谢谢。我最终创建了一个任务装饰器,它使用 PostgreSQL 咨询锁来保护任务。我走这条路是为了让不同机器上的工作人员可以维护一个同步的锁定点。 (我们目前没有像示例中那样使用 memcached)。
  • @erydo,你愿意分享那个装饰器吗?我正在尝试做几乎相同的事情并遇到了一些奇怪的同步问题。
  • 如果你说只有一名工人来完成这项任务,我认为你仍然会有积压的任务。需要额外的工人来防止积压
【解决方案3】:

我使用celery-once 解决了这个问题,我将其扩展到celery-one

两者都可以解决您的问题。它使用 Redis 锁定正在运行的任务。 celery-one 还将跟踪锁定的任务。

下面是一个非常简单的 celery beat 用法示例。在下面的代码中,slow_task 每 1 秒调度一次,但它的完成时间是 5 秒。即使它已经在运行,普通的 celery 也会每秒安排一次任务。 celery-one 会阻止这种情况发生。

celery = Celery('test')
celery.conf.ONE_REDIS_URL = REDIS_URL
celery.conf.ONE_DEFAULT_TIMEOUT = 60 * 60
celery.conf.BROKER_URL = REDIS_URL
celery.conf.CELERY_RESULT_BACKEND = REDIS_URL

from datetime import timedelta

celery.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.slow_task',
        'schedule': timedelta(seconds=1),
        'args': (1,)
    },
}

celery.conf.CELERY_TIMEZONE = 'UTC'


@celery.task(base=QueueOne, one_options={'fail': False})
def slow_task(a):
    print("Running")
    sleep(5)
    return "Done " + str(a)

【讨论】:

  • 请 a) 在此处包含答案的相关部分,并 b) 声明您对该网站的兴趣。这已被标记为垃圾邮件,所以我为您删除了它。
  • 我已取消删除您的答案并使垃圾邮件标记无效。感谢您的跟进。
【解决方案4】:

我尝试编写一个装饰器来使用 Postgres advisory locking,类似于 erydo 在他的评论中提到的内容。

它不是很漂亮,但似乎可以正常工作。这是 Python 2.7 下的 SQLAlchemy 0.9.7。

from functools import wraps
from sqlalchemy import select, func

from my_db_module import Session # SQLAlchemy ORM scoped_session

def pg_locked(key):
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kw):
            session = db.Session()
            try:
                acquired, = session.execute(select([func.pg_try_advisory_lock(key)])).fetchone()
                if acquired:
                    return f(*args, **kw)
            finally:
                if acquired:
                    session.execute(select([func.pg_advisory_unlock(key)]))
        return wrapped
    return decorator

@app.task
@pg_locked(0xdeadbeef)
def singleton_task():
    # only 1x this task can run at a time
    pass

(欢迎任何 cmet 改进这一点!)

【讨论】:

  • Dan,是的,这基本上与我想出的相同——我已将我的解决方案添加为编辑。它包括对非阻塞锁定尝试和可配置超时的支持,因为我最终也在另一个地方使用了锁定代码。
【解决方案5】:

需要分布式锁定系统,因为这些 Celery beat 实例本质上是不同的进程,可能跨不同的主机。

ZooKeeper、etcd等中心坐标系适合分布式锁系统的实现。

我推荐使用轻量且快速的 etcd。 lock over etcd有几种实现方式,比如:

python-etcd-lock

【讨论】:

  • 分布式锁定是一个很好的建议,但并非严格要求。只要一切都指向同一个中心,集中锁定就可以了。在我们的例子中,这些 celery worker 无论如何都与同一个主 Postgres 数据库进行交互。不过,etcd、ZooKeeper 或 consul 是很好的建议。
猜你喜欢
  • 2015-10-24
  • 2021-02-25
  • 2019-07-20
  • 2019-02-16
  • 2018-09-17
  • 2020-04-05
  • 2020-02-27
  • 2020-03-16
  • 1970-01-01
相关资源
最近更新 更多