【问题标题】:Redis connections not being released after Celery task is completeCelery 任务完成后 Redis 连接未释放
【发布时间】:2017-11-14 03:23:20
【问题描述】:

我将 Redis 用于两件事:1) 作为 Celery 后端和 2) 作为我的 Celery 任务的锁持有者

这是我正在运行的代码示例:

def get_redis():
    url = os.environ.get("REDIS_URL")

    if url:
        r = redis.from_url(url)  # use secure for heroku
    else:
        r = redis.Redis()  # use unauthed connection locally

    return r

@app.task(bind=True, max_retries=10)
def test_delay_task(self, task_id):
    ''' Each task with try to grab a lock and once it does, will sleep 5 seconds, then
    print and exit.
    '''
    have_lock = False
    r = get_redis()
    lock = r.lock('mws_api')
    try:
        have_lock = lock.acquire(blocking=False)
        if have_lock:
            logger.warning("{} Lock Acquired".format(task_id))
            time.sleep(5)
            logger.warning('Test Task {} successful!'.format(task_id))
        else:
            logger.warning("{} Lock In Use, Retrying".format(task_id))
            self.request.retries = 1
            self.retry(countdown=5 * random.uniform(0.8, 1.2))

    finally:
        if have_lock:
            lock.release()

        # We'll come back to this code, but it partially works
        # c = r.info()['connected_clients']
        # print("Disconnecting Redis | Connections: {}".format(c))
        # r.connection_pool.disconnect()


@app.task(bind=True, max_retries=10)
def test_parallel_tasks(self):
    ''' Runs 10 consecutive tasks, each which will try to grab a lock and run. '''
    for i in range(5):
        test_delay_task.delay(i)

当我运行此程序时,我发现与 Redis 的连接数量激增。我正在用这段代码测量这个:

def get_connected_clients():
    try:
        connections = 0
        while True:
            time.sleep(.25)
            c = get_redis().info()['connected_clients']
            # c = redis.Redis().info()['connected_clients']
            if c != connections:
                now = datetime.datetime.now()
                print("{} | Active Connections: {}".format(now, c))
                connections = c
            else:
                continue
    except KeyboardInterrupt:
        print("Shutting Down")

结果是这样的:

Celery Starts
2017-11-04 01:29:51.463512 | Active Connections: 7
2017-11-04 01:29:52.477220 | Active Connections: 12


Run Task
2017-11-04 01:30:18.755118 | Active Connections: 33
2017-11-04 01:30:23.847573 | Active Connections: 34
2017-11-04 01:30:24.101263 | Active Connections: 39
2017-11-04 01:30:24.610450 | Active Connections: 40
2017-11-04 01:30:28.944949 | Active Connections: 41
2017-11-04 01:30:30.208845 | Active Connections: 43
2017-11-04 01:30:33.780812 | Active Connections: 42
2017-11-04 01:30:34.548651 | Active Connections: 43
2017-11-04 01:30:34.804526 | Active Connections: 44
2017-11-04 01:30:35.058731 | Active Connections: 47
2017-11-04 01:30:39.626745 | Active Connections: 48
2017-11-04 01:30:40.648594 | Active Connections: 49
Task Complete

Wait

Kill Celery
2017-11-04 01:31:57.766001 | Active Connections: 45
2017-11-04 01:31:58.786042 | Active Connections: 5
2017-11-04 01:31:59.291814 | Active Connections: 3

据我所知,这些连接永远不会消失,除非我关闭 Celery 并重新启动它。再次运行任务会增加打开的连接数,直到我关闭 Celery 才会减少。运行 3 次后,活动连接数达到 77。


如果我在上面的任务中添加注释代码,它似乎有帮助,但总连接数对我来说似乎仍然很高。运行多次现在看起来像这样:

Started with Disconnect Code Uncommented
2017-11-04 01:37:44.773113 | Active Connections: 29
2017-11-04 01:37:54.689032 | Active Connections: 33
2017-11-04 01:37:59.789031 | Active Connections: 32
2017-11-04 01:38:01.057219 | Active Connections: 33
2017-11-04 01:38:02.330613 | Active Connections: 36
2017-11-04 01:38:06.139188 | Active Connections: 35
2017-11-04 01:38:07.917854 | Active Connections: 36
2017-11-04 01:38:13.016428 | Active Connections: 35
2017-11-04 01:39:11.848758 | Active Connections: 36
Second Run
2017-11-04 01:39:18.224475 | Active Connections: 38
2017-11-04 01:39:22.043765 | Active Connections: 37
2017-11-04 01:39:23.061727 | Active Connections: 38
2017-11-04 01:39:38.106320 | Active Connections: 37
Third Run
2017-11-04 01:40:49.623050 | Active Connections: 38
2017-11-04 01:40:54.480170 | Active Connections: 37
2017-11-04 01:40:55.501791 | Active Connections: 38
2017-11-04 01:41:00.330222 | Active Connections: 37
2017-11-04 01:41:03.643833 | Active Connections: 38
2017-11-04 01:41:08.735973 | Active Connections: 37
2017-11-04 01:41:10.257756 | Active Connections: 38
2017-11-04 01:41:15.348323 | Active Connections: 37
2017-11-04 01:41:17.137816 | Active Connections: 38
2017-11-04 01:41:22.241020 | Active Connections: 37

好的,说了这么多,我的问题是:为什么我的连接没有关闭,我该如何解决?我将需要运行类似的代码,但要运行 100 多个并行任务,而不仅仅是我在此示例中使用的 5 个。

【问题讨论】:

  • 你使用的是什么 Redis 绑定?
  • Redis绑定是什么意思? (您可以从我缺乏知识的情况下假设我正在使用默认值)。我正在使用 redis==2.10.5 库。
  • 我看到你在使用redis-py,关于绑定,see this
  • 那么 celery 工作进程有多少?
  • 看看这是否有帮助stackoverflow.com/questions/31663288/…。您没有使用连接池,然后一次又一次地创建 redis 连接以获取连接的客户端信息。你不应该使用get_redis().info()['connected_clients']。而是在循环外创建对象r = get_redis(),然后在循环内使用r.info()['connected_clients']

标签: python heroku redis celery connection-pooling


【解决方案1】:

这是似乎可以工作的代码。 至少我无法重现与原始问题不同的问题。 注意app.conf.broker_pool_limit = 0connection_pool.disconnect。 这是broker_pool_limit 所做的:

连接中可以打开的最大连接数 水池。如果设置为 None 或 0,则连接池将被禁用,并且每次使用都会建立和关闭连接。

import os
import time
import random
import datetime
import logging
import redis

logging.basicConfig()
logger = logging.getLogger(__name__)

from celery import Celery
from celery.contrib import rdb
app = Celery('tasks', backend='redis://localhost', broker='redis://localhost')

app.conf.broker_pool_limit = 0

def get_redis():
    url = os.environ.get("REDIS_URL")

    if url:
        r = redis.from_url(url)  # use secure for heroku
    else:
        r = redis.Redis()  # use unauthed connection locally

    return r

@app.task(bind=True, max_retries=10)
def test_delay_task(self, task_id):
    ''' Each task with try to grab a lock and once it does, will sleep 5 seconds, then
    print and exit.
    '''
    have_lock = False
    redis_cli = get_redis()
    lock = redis_cli.lock('mws_api')
    try:
        have_lock = lock.acquire(blocking=False)
        if have_lock:
            logger.warning("{} Lock Acquired".format(task_id))
            time.sleep(5)
            logger.warning('Test Task {} successful!'.format(task_id))
        else:
            logger.warning("{} Lock In Use, Retrying".format(task_id))
            self.request.retries = 1
            self.retry(countdown=5 * random.uniform(0.8, 1.2))
    finally:
        if have_lock:
            lock.release()
        redis_cli.connection_pool.disconnect()

        # We'll come back to this code, but it partially works
        # c = r.info()['connected_clients']
        # print("Disconnecting Redis | Connections: {}".format(c))
        # r.connection_pool.disconnect()


@app.task(bind=True, max_retries=10)
def test_parallel_tasks(self):
    ''' Runs 10 consecutive tasks, each which will try to grab a lock and run. '''
    for i in range(5):
        test_delay_task.delay(i)


def get_connected_clients():
    try:
        connections = 0
        while True:
            time.sleep(.25)
            c = get_redis().info()['connected_clients']
            # c = redis.Redis().info()['connected_clients']
            if c != connections:
                now = datetime.datetime.now()
                print("{} | Active Connections: {}".format(now, c))
                connections = c
            else:
                continue
    except KeyboardInterrupt:
        print("Shutting Down")

运行此代码时,每个工作人员都有机会处理请求后,每个工作人员只持有一个连接 + 芹菜主进程持有的一堆连接。

连接数学

对于这个脚本,celery 主进程需要 8 个连接,ipython shell 在查询了一些任务后需要 4 个连接,并且 1 每个celery 工作人员的连接,一旦工作人员处理了任务。所以最初的尖峰是由需要这么多连接的celery master 引起的。如果没有 broker_pool_limit 设置,它最初需要 10 个连接

【讨论】:

  • 这减少了所需的连接数。它现在最多可以连接大约 20 个连接,这符合您的数学计算。感谢您帮助我理解这一点!
  • 谢谢你!你是怎么做“连接数学”的?是否有文档/代码详细说明celeryipython 需要多少连接?
  • celery prefork 工作进程在处理任务时是否可以共享连接? redis.py(>=3.2) 是过程安全的,理论上可能吗?如果每个 prefork 进程都持有一个连接,celery 集群的最大并发受限于 redis 的最大连接数,但是 celery 还不支持 redis 集群,redis 连接成为瓶颈。
猜你喜欢
  • 1970-01-01
  • 2020-09-24
  • 2020-11-14
  • 2012-07-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-11
  • 2021-10-15
相关资源
最近更新 更多