【发布时间】:2014-01-03 01:03:52
【问题描述】:
我有 celery beat 和 celery(四名工人)批量做一些加工步骤。其中一项任务大致是这样的,“对于每个尚未创建 Y 的 X,创建一个 Y。”
任务以半快速(10 秒)的速度定期运行。任务完成得非常快。还有其他任务正在进行中。
我多次遇到过节拍任务明显积压的问题,因此同时执行相同的任务(来自不同节拍时间),导致错误地重复工作。似乎这些任务也是乱序执行的。
是否可以限制 celery beat 以确保一次只有一个未完成的任务实例?在任务上设置
rate_limit=5之类的“正确”方式是这样做的吗?是否可以确保节拍任务按顺序执行,例如beat 不是分派任务,而是将其添加到任务链中?
除了让这些任务本身以原子方式执行并且可以安全地并发执行之外,最好的处理方法是什么?这不是我期望完成任务的限制……
任务本身的定义很天真:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
这是一个实际的(清理过的)日志:
-
[00:00.000]foocorp.tasks.add_y_to_xs 已发送。 id->#1 -
[00:00.001]收到任务:foocorp.tasks.add_y_to_xs[#1] -
[00:10.009]foocorp.tasks.add_y_to_xs 已发送。 id->#2 -
[00:20.024]foocorp.tasks.add_y_to_xs 已发送。 id->#3 -
[00:26.747]收到任务:foocorp.tasks.add_y_to_xs[#2] -
[00:26.748]TaskPool:应用#2 -
[00:26.752]收到任务:foocorp.tasks.add_y_to_xs[#3] -
[00:26.769]任务接受:foocorp.tasks.add_y_to_xs[#2] pid:26528 -
[00:26.775]Task foocorp.tasks.add_y_to_xs[#2] 0.0197986490093s 成功:无 -
[00:26.806]TaskPool:应用#1li> -
[00:26.836]TaskPool:应用#3 -
[01:30.020]接受任务:foocorp.tasks.add_y_to_xs[#1] pid:26526 -
[01:30.053]任务接受:foocorp.tasks.add_y_to_xs[#3] pid:26529 -
[01:30.055]foocorp.tasks.add_y_to_xs[#1]:为 X id 添加 Y #9725 -
[01:30.070]foocorp.tasks.add_y_to_xs[#3]:为 X id 添加 Y #9725 -
[01:30.074]Task foocorp.tasks.add_y_to_xs[#1] 0.0594762689434s 成功:无 -
[01:30.087]任务 foocorp.tasks.add_y_to_xs[#3] 在 0.0352867960464s 内成功:无
我们目前使用 Celery 3.1.4 和 RabbitMQ 作为传输。
编辑丹,这是我想出的:
丹,这是我最终使用的:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
还有 celery 任务装饰器(仅用于周期性任务):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task
【问题讨论】:
-
谢谢,这是一个非常彻底的带有咨询锁的解决方案,而且比我的
@contextmanager干净得多。我对没有解锁感到困惑,直到我意识到您正在使用事务级锁定......这只是为了方便还是有其他理由选择它? -
我将
SET statement_timeout TO :timeout更改为SET LOCAL lock_timeout TO :timeout(postgresql.org/docs/9.3/static/runtime-config-client.html)。这样您就不会影响会话中任何长时间运行的非锁定语句。 (可能不是问题,因为你说你的跑得很快!) -
@DanLenski 这似乎是一个很好的改变,谢谢。我们的大多数辅助函数都包装在子事务中,因此我使用事务级锁定来确保当同一会话中有其他工作时,全局锁定不会不必要地保持太久。
-
但我刚刚意识到,通过创建新连接 (
conn.begin()),没有理由使用事务锁而不是会话锁。同样,据我了解,在这种情况下也没有理由区分lock_timeout和statement_timeout。 -
@erydo 只是我的一周变得更好了......)
标签: python concurrency rabbitmq celery celerybeat