【问题标题】:Flask Celery task lockingFlask Celery 任务锁定
【发布时间】:2019-05-25 19:23:23
【问题描述】:

我正在将 Flask 与 Celery 一起使用,并且我正在尝试锁定特定任务,以便一次只能运行一个。在 celery 文档中,它给出了一个这样做的例子Celery docs, Ensuring a task is only executed one at a time。给出的这个示例是针对 Django 的,但是我使用的是烧瓶,我已尽力将其转换为与 Flask 一起使用,但是我仍然看到具有锁的 myTask1 可以多次运行。

我不清楚的一件事是,如果我正确使用缓存,我以前从未使用过它,所以所有这些对我来说都是新的。文档中提到但未解释的一件事是

文档注释:

In order for this to work correctly you need to be using a cache backend where the .add operation is atomic. memcached is known to work well for this purpose.

我不确定这意味着什么,我是否应该将缓存与数据库结合使用,如果是,我将如何做到这一点?我正在使用 mongodb。在我的代码中,我只是为缓存cache = Cache(app, config={'CACHE_TYPE': 'simple'}) 设置了这个设置,因为这是 Flask-Cache 文档的Flask-Cache Docs 中提到的内容@

我不清楚的另一件事是我是否需要做任何不同的事情,因为我在我的 Flask 路由 task1 中调用我的 myTask1

这是我正在使用的代码示例。

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time


app = Flask(__name__)

cache = Cache(app, config={'CACHE_TYPE': 'simple'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################


app.config['CELERY_BROKER_URL'] = 'amqp://localhost//'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')

    lock_id = self.name

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html') 


@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))

最终工作代码

from flask import (Flask, render_template, flash, redirect,
                   url_for, session, logging, request, g, render_template_string, jsonify)
from flask_caching import Cache
from contextlib import contextmanager
from celery import Celery
from Flask_celery import make_celery
from celery.result import AsyncResult
from celery.utils.log import get_task_logger
from celery.five import monotonic
from flask_pymongo import PyMongo
from hashlib import md5
import pymongo
import time
import redis
from flask_redis import FlaskRedis


app = Flask(__name__)

# ADDING REDIS
redis_store = FlaskRedis(app)

# POINTING CACHE_TYPE TO REDIS
cache = Cache(app, config={'CACHE_TYPE': 'redis'})
app.config['SECRET_KEY']= 'super secret key for me123456789987654321'

######################
# MONGODB SETUP
#####################
app.config['MONGO_HOST'] = 'localhost'
app.config['MONGO_DBNAME'] = 'celery-test-db'
app.config["MONGO_URI"] = 'mongodb://localhost:27017/celery-test-db'


mongo = PyMongo(app)


##############################
# CELERY ARGUMENTS
##############################

# CELERY USING REDIS
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'mongodb://localhost:27017/celery-test-db'

app.config['CELERY_RESULT_BACKEND'] = 'mongodb'
app.config['CELERY_MONGODB_BACKEND_SETTINGS'] = {
    "host": "localhost",
    "port": 27017,
    "database": "celery-test-db", 
    "taskmeta_collection": "celery_jobs",
}

app.config['CELERY_TASK_SERIALIZER'] = 'json'


celery = Celery('task',broker='mongodb://localhost:27017/jobs')
celery = make_celery(app)


LOCK_EXPIRE = 60 * 2  # Lock expires in 2 minutes


@contextmanager
def memcache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    print('in memcache_lock and timeout_at is {}'.format(timeout_at))
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
        print('memcache_lock and status is {}'.format(status))
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)



@celery.task(bind=True, name='app.myTask1')
def myTask1(self):

    self.update_state(state='IN TASK')
    print('dir is {} '.format(dir(self)))

    lock_id = self.name
    print('lock_id is {}'.format(lock_id))

    with memcache_lock(lock_id, self.app.oid) as acquired:
        print('in memcache_lock and lock_id is {} self.app.oid is {} and acquired is {}'.format(lock_id, self.app.oid, acquired))
        if acquired:
            # do work if we got the lock
            print('acquired is {}'.format(acquired))
            self.update_state(state='DOING WORK')
            time.sleep(90)
            return 'result'

    # otherwise, the lock was already in use
    raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later



@celery.task(bind=True, name='app.myTask2')
def myTask2(self):
    print('you are in task2')
    self.update_state(state='STARTING')
    time.sleep(120)
    print('task2 done')


@app.route('/', methods=['GET', 'POST'])
def index():

    return render_template('index.html')

@app.route('/task1', methods=['GET', 'POST'])
def task1():

    print('running task1')
    result = myTask1.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)


    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'myTask1'})

    return render_template('task1.html')


@app.route('/task2', methods=['GET', 'POST'])
def task2():

    print('running task2')
    result = myTask2.delay()

    # get async task id
    taskResult = AsyncResult(result.task_id)

    # push async taskid into db collection job_task_id
    mongo.db.job_task_id.insert({'taskid': str(taskResult), 'TaskName': 'task2'})

    return render_template('task2.html')

@app.route('/status', methods=['GET', 'POST'])
def status():

    taskid_list = []
    task_state_list = []
    TaskName_list = []

    allAsyncData = mongo.db.job_task_id.find()

    for doc in allAsyncData:
        try:
            taskid_list.append(doc['taskid'])
        except:
            print('error with db conneciton in asyncJobStatus')

        TaskName_list.append(doc['TaskName'])

    # PASS TASK ID TO ASYNC RESULT TO GET TASK RESULT FOR THAT SPECIFIC TASK
    for item in taskid_list:
        try:
            task_state_list.append(myTask1.AsyncResult(item).state)
        except:
            task_state_list.append('UNKNOWN')

    return render_template('status.html', data_list=zip(task_state_list, TaskName_list))


if __name__ == '__main__':
    app.secret_key = 'super secret key for me123456789987654321'
    app.run(port=1234, host='localhost')

这也是一个屏幕截图,你可以看到我运行了两次 myTask1 和一次 myTask2。现在我有了 myTask1 的预期行为。现在myTask1 将由单个工作人员运行,如果另一个工作人员尝试接听它,它将根据我定义的内容继续重试。

【问题讨论】:

  • self 是一个字符串,self.cache 不存在。只是一个猜测,但也许 Cache.add 应该是一个实例,例如:Cache().add ?因为当add get 被调用时,第一个参数可能是 self 像 def add(self, lock_id, oid, lock_expire): 所以 self 是你如何拥有它的 lock_id?
  • 感谢您的建议,我试了一下 status = Cache().add(lock_id, oid, LOCK_EXPIRE) 但这给了我一个新的回溯。
  • 获取 memcache_lock(lock_id, self.app.oid):文件“/auto/pysw/cel63/python/3.4.1/lib/python3.4/contextlib.py”,第 59 行, 在 enter 返回 next(self.gen) File "app.py", line 63, in memcache_lock status = Cache().add(lock_id, oid, LOCK_EXPIRE) File "/pyats2/lib/ python3.4/site-packages/flask_cache/__init__.py”,第 204 行,在添加 self.cache.add(*args, **kwargs) 文件“/ws/mastarke-sjc/pyats2/lib/python3.4/ site-packages/flask_cache/__init__.py",第 192 行,在缓存中返回 app.extensions['cache'][self] KeyError:'cache'
  • 任务被多次调用时你想要的行为是什么?您希望将任务排入队列还是完全忽略?
  • 理想情况下我想要的是当myTask1 被调用时,它只会被排队并且在锁定完成之前不会运行。

标签: python flask celery celery-task flask-cache


【解决方案1】:

我还发现这是一个令人惊讶的难题。主要受Sebastian's work 在redis 中实现分布式锁定算法的启发,我写了一个decorator function

关于这种方法要记住的一个关键点是,我们将任务锁定在任务的参数空间级别,例如我们允许多个游戏更新/处理订单任务同时运行,但每个游戏只能运行一个。这就是argument_signature 在下面的代码中实现的。您可以在this gist 处查看有关我们如何在堆栈中使用它的文档:

import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid

from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock

rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])

TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60  # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60  # we can keep cached values around for a shorter period of time

REMOVE_ONLY_IF_OWNER_SCRIPT = """
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end
"""


@contextmanager
def redis_lock(lock_name, expires=60):
    # https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
    random_value = str(uuid.uuid4())
    lock_acquired = bool(
        rds.set(lock_name, random_value, ex=expires, nx=True)
    )
    yield lock_acquired
    if lock_acquired:
        rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)


def argument_signature(*args, **kwargs):
    arg_list = [str(x) for x in args]
    kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
    return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()


def task_lock(func=None, main_key="", timeout=None):
    def _dec(run_func):
        def _caller(*args, **kwargs):
            with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
                if not acquired:
                    return TASK_LOCK_MSG
                return run_func(*args, **kwargs)
        return _caller
    return _dec(func) if func is not None else _dec

在我们的任务定义文件中实现:

@celery.task(name="async_test_task_lock")
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
    print(f"processing game_id {game_id}")
    time.sleep(TASK_LOCK_TEST_SLEEP)

我们如何针对本地 celery 集群进行测试:

from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
    def test_task_locking(self):
        rds.flushall()
        res1 = async_test_task_lock.delay(3)
        res2 = async_test_task_lock.delay(5)
        self.assertFalse(res1.ready())
        self.assertFalse(res2.ready())
        res3 = async_test_task_lock.delay(5)
        res4 = async_test_task_lock.delay(5)
        self.assertEqual(res3.get(), TASK_LOCK_MSG)
        self.assertEqual(res4.get(), TASK_LOCK_MSG)
        time.sleep(TASK_LOCK_TEST_SLEEP)
        res5 = async_test_task_lock.delay(3)
        self.assertFalse(res5.ready())

(作为一个好东西,还有一个关于如何设置redis_cache 的快速示例)

【讨论】:

    【解决方案2】:

    在您的问题中,您从您使用的 Celery 示例中指出了这个警告:

    为了使其正常工作,您需要使用.add 操作是原子操作的缓存后端。众所周知,memcached 可以很好地实现此目的。

    你提到你并不真正理解这意味着什么。事实上,您展示的代码表明您没有注意到该警告,因为您的代码使用了不适当的后端。

    考虑这段代码:

    with memcache_lock(lock_id, self.app.oid) as acquired:
        if acquired:
            # do some work
    

    您想要的是acquired 一次只对一个线程有效。如果两个线程同时进入with 块,则只有一个线程应该“获胜”并且acquired 为真。具有acquired true 的线程可以继续其工作,而另一个线程必须跳过工作并稍后再试以获取锁。 为了确保只有一个线程可以拥有acquired true,.add 必须是原子的。

    这是.add(key, value) 所做的一些伪代码:

    1. if <key> is already in the cache:
    2.   return False    
    3. else:
    4.   set the cache so that <key> has the value <value>
    5.   return True
    

    如果.add 的执行不是原子的,那么如果两个线程A 和B 执行.add("foo", "bar"),就会发生这种情况。假设一开始有一个空缓存。

    1. 线程 A 执行 1. if "foo" is already in the cache 发现 "foo" 不在缓存中,并跳转到第 3 行,但线程调度程序将控制权切换到线程 B。
    2. 线程 B 也执行了1. if "foo" is already in the cachealso 发现“foo”不在缓存中。因此它跳到第 3 行,然后执行第 4 行和第 5 行,将键 "foo" 设置为值 "bar",调用返回 True
    3. 最终,调度程序将控制权交还给线程 A,线程 A 继续执行 3、4、5,还将键 "foo" 设置为值 "bar",并返回 True

    这里有两个返回True.add 调用,如果这些.add 调用是在memcache_lock 内进行的,这意味着两个线程可以使acquired 为真。所以两个线程可以同时工作,而你的memcache_lock 并没有做它应该做的事情,即一次只允许一个线程工作。

    您没有使用确保.add 是原子的缓存。你像这样初始化它:

    cache = Cache(app, config={'CACHE_TYPE': 'simple'})
    

    simple backend 作用于单个进程,没有线程安全性,并且有一个非原子的.add 操作。 (顺便说一句,这根本不涉及 Mongo。如果您希望缓存由 Mongo 支持,则必须指定专门用于将数据发送到 Mongo 数据库的支持。)

    所以你必须切换到另一个后端,一个保证.add 是原子的。您可以按照 Celery 示例的引导并使用 memcached backend,它确实具有原子 .add 操作。我不使用 Flask,但我基本上已经完成了您对 Django 和 Celery 所做的工作,并且成功地使用了 Redis 后端来提供您在此处使用的那种锁定。

    【讨论】:

    • 好吧,与其使用 cache = Cache(app, config={'CACHE_TYPE': 'simple'}) 这样的简单缓存,我应该切换到使用像 cache = Cache(app, config={'CACHE_TYPE': 'redis'}) 这样的 Redis 缓存 现在我使用 RabbitMQ,但如果需要,我可以切换到 Redis让这个工作。
    • 是的,如果你想使用 Redis,你可以使用cache = Cache(app, config={'CACHE_TYPE': 'redis'})。我知道 RabbitMQ 的名字,但我从未真正使用过它,所以我不知道它是否适合作为锁定的后端。
    • 好的,谢谢。我没有看到官方 Flask-Cache Doc's for Built-in Cache Backend 中提到的 RabbitMQ,但我确实看到了 Redis。让我在我的环境中设置 Redis,看看效果如何。如果可行,我会将您的答案标记为已接受。再次感谢您的详细解释。
    • 谢谢你,我终于可以让它工作了。基本上需要使用 Redis 我改为cache = Cache(app, config={'CACHE_TYPE': 'redis'}) 现在可以了。
    【解决方案3】:

    使用此设置,您应该仍然可以看到工作人员收到任务,因为锁是在任务本身内部检查的。唯一的区别是,如果锁被另一个工作人员获取,则不会执行工作。
    在文档中给出的示例中,这是所需的行为;如果锁已经存在,则任务将不做任何事情并成功完成。你想要的略有不同;你希望工作被排队而不是被忽略。

    为了获得预期的效果,您需要确保该任务将由工作人员接手并在未来某个时间执行。实现此目的的一种方法是重试。

    @task(bind=True, name='my-task')
    def my_task(self):
        lock_id = self.name
    
        with memcache_lock(lock_id, self.app.oid) as acquired:
            if acquired:
                # do work if we got the lock
                print('acquired is {}'.format(acquired))
                return 'result'
    
        # otherwise, the lock was already in use
        raise self.retry(countdown=60)  # redeliver message to the queue, so the work can be done later
    

    【讨论】:

    • 谢谢我明白你的意思并感谢你的帮助。但是我仍然看到我可以多次调用它。当我查看工作人员日志时,您可以看到工作人员 3 和 4 几乎同时拿起了 myTask1。 140&gt; tail -f worker-3.log [2018-12-28 12:51:55,849: WARNING/ForkPoolWorker-3] acquired is True [2018-12-28 12:53:25,943: INFO/ForkPoolWorker-3] Task app.myTask1[5f492c3f-9684-493e-8873-1190f71527a5] succeeded in 90.10438371099985s: 'result'
    • 这是工人 4 的日志141&gt; tail -f worker-4.log [2018-12-28 12:51:59,381: WARNING/ForkPoolWorker-4] acquired is True [2018-12-28 12:53:29,476: INFO/ForkPoolWorker-4] Task app.myTask1[be05682f-1ff4-452b-9dff-c4593bd3c452] succeeded in 90.11584289799998s: 'result'
    • 你可以看到他们都在'12:53'左右拿起了myTask1
    • 我更新了我的帖子以向 myTask1 展示您的解决方案。
    • 它会向我表明memcache_lock 没有按预期运行。我对flask-cache 不熟悉,但我想知道当您.add 现有密钥时返回的值是什么。此外,flask-cache 似乎绑定到请求上下文?同样根据文档,“简单”缓存只是一个 Python 字典。也许您可以尝试配置一个实际的 memcached 缓存。