【问题标题】:Mutlithreading with raw PyMySQL for celery用于芹菜的原始 PyMySQL 多线程
【发布时间】:2021-02-06 16:05:56
【问题描述】:

在我目前正在进行的项目中,我不允许使用 ORM,所以我创建了my own

效果很好,但我在使用 Celery 时遇到了问题,而且它是并发的。有一段时间,我将其设置为1(使用--concurrency=1),但我添加了new tasks,这需要比使用celery beat 运行更多的时间来处理,这会导致大量任务积压。

当我将 celery 的并发设置为 > 1 时,会发生以下情况(pastebin,因为它很大):

https://pastebin.com/M4HZXTDC

关于如何在其他进程上实现某种锁定/等待以使不同的工作人员不会相互交叉的任何想法?

编辑:这里是我设置PyMySQL instance 的地方以及open and close are handled 的设置方式

【问题讨论】:

  • 嗯,他们应该这样做,但似乎他们同时这样做,或者在我设置 PyMySQL 的方式上有些东西我不明白
  • 我在代码中看到了这个:database_uri=f"sqlite:///{BACKEND_ROOT}/../chatbot_database.sqlite3",
  • 看起来数据库连接只在 Flask 应用程序中完成。我不知道 Celery 任务是否可以使用它。最好的办法是获取这些连接详细信息,并在任务中打开数据库会话,执行 DML 并关闭它。我相信 Flask 用户会知道问题所在......
  • 您看到的内容与其他内容相关联,而不是错误的一部分。这是一个本地 sqlite 数据库,不被烧瓶使用,而只被 celery 使用,不会导致错误。当多个线程尝试同时读取/写入数据库时​​会导致该错误
  • 您有两个工作人员,他们使用相同的网络连接,这将永远无法正常工作,因此每个实例都需要自己的

标签: python mysql flask celery pymysql


【解决方案1】:

PyMSQL 不允许线程共享同一个连接(模块可以共享,但线程不能共享连接)。您的 Model 类在任何地方都是 reusing the same connection

所以,当不同的worker调用模型进行查询时,他们使用的是同一个连接对象,导致冲突。

确保您的连接对象是线程本地的。与其拥有db 类属性,不如考虑一种检索线程本地连接对象的方法,而不是重用可能在不同线程中创建的连接对象。

例如,create your connection in the task

现在,您对每个模型都使用全局连接。

# Connect to the database
connection = pymysql.connect(**database_config)


class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    db = connection

为避免这种情况,您可以改为在 __init__ 方法中创建数据库...

class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """

    def __init__(self, *args, **kwargs):
        self.db = pymysql.connect(**database_config)

但是,这可能不是高效/实用的,因为 db 对象的每个实例都会创建一个会话。

为了改进这一点,您可以使用一种使用threading.local 的方法来保持线程本地连接。



class Model(object):
    """
    Base Model class, all other Models will inherit from this
    """
    _conn = threading.local()
    @property
    def db(self):
        if not hasattr(self._conn, 'db'):
            self._conn.db = pymysql.connect(**database_config)
        return self._conn.db

请注意,假设您使用的是线程并发模型,则线程本地解决方案有效。另请注意,celery 默认使用多个进程(prefork)。这可能是也可能不是问题。如果这是一个问题,你可以通过change the workers to use eventlet 来解决它。

【讨论】:

  • 哦,我明白了!我会试试这个,让你知道它是怎么回事
  • 我已授予您 +rep 赏金,因为它会浪费掉,即使我还没有解决我的问题。我仍在尝试弄清楚如何实现线程安全的数据库连接以及所有...
  • 我认为如果您将this line 移动到__init__ 内部并每次self.db = pymysql.connect(**database_config) 或创建一个新连接,将会有很大帮助。您可能还会发现利用 Python 的 thread-local data 接口进行优化很有用(只为每个线程创建一次连接,而不是每个模型对象)
  • 非常感谢您对答案的编辑,db 属性有效!现在只需优化需要一段时间的任务,但除此之外,它完美无瑕!太感谢了 !我确实必须使用--pool=threads 才能工作!
猜你喜欢
  • 2015-01-10
  • 2013-10-04
  • 2016-01-26
  • 2023-03-27
  • 2018-01-14
  • 1970-01-01
  • 2015-07-04
  • 2021-10-24
  • 2012-11-13
相关资源
最近更新 更多