【问题标题】:Accessing a MySQL connection pool from Python multiprocessing从 Python 多处理访问 MySQL 连接池
【发布时间】:2014-08-13 23:21:44
【问题描述】:

我正在尝试设置 MySQL 连接池,并让我的工作进程访问已建立的池,而不是每次都设置新连接。

我很困惑是否应该将数据库游标传递给每个进程,或者是否有其他方法可以做到这一点? MySql.connector 不应该自动进行池化吗?当我检查我的日志文件时,打开和关闭了许多连接......每个进程一个。

我的代码如下所示:

PATH = "/tmp"

class DB(object):
  def __init__(self):
    connected = False
    while not connected:
      try:
        cnxpool = mysql.connector.pooling.MySQLConnectionPool(pool_name = "pool1",
                                                          **config.dbconfig)
        self.__cnx = cnxpool.get_connection()
      except mysql.connector.errors.PoolError:
        print("Sleeping.. (Pool Error)")
        sleep(5)
      except mysql.connector.errors.DatabaseError:
        print("Sleeping.. (Database Error)")
        sleep(5)

    self.__cur = self.__cnx.cursor(cursor_class=MySQLCursorDict)

  def execute(self, query):
    return self.__cur.execute(query)

def isValidFile(self, name):
  return True

def readfile(self, fname):
  d = DB()
  d.execute("""INSERT INTO users (first_name) VALUES ('michael')""")

def main():
  queue = multiprocessing.Queue()
  pool = multiprocessing.Pool(None, init, [queue])
  for dirpath, dirnames, filenames in os.walk(PATH):

    full_path_fnames = map(lambda fn: os.path.join(dirpath, fn),
                           filenames)
    full_path_fnames = filter(is_valid_file, full_path_fnames)
    pool.map(readFile, full_path_fnames)

if __name__ == '__main__':
  sys.exit(main())

【问题讨论】:

标签: python connection-pooling mysql-python


【解决方案1】:

首先,您为DB 类的每个实例创建不同的连接池。具有相同名称的池不会使它们成为同一个池

来自documentation

多个池具有相同名称不是错误。必须通过 pool_name 属性区分池的应用程序应该使用不同的名称创建每个池。

除此之外,在不同进程之间共享数据库连接(或连接池)将是一个坏主意(我非常怀疑它是否会正常工作),因此每个使用它自己的连接的进程实际上应该是你的目标。

您可以将 init 初始化程序中的池初始化为全局变量,然后改用它。
很简单的例子:

from multiprocessing import Pool
from mysql.connector.pooling import MySQLConnectionPool
from mysql.connector import connect
import os

pool = None

def init():
    global pool
    print("PID %d: initializing pool..." % os.getpid())
    pool = MySQLConnectionPool(...)

def do_work(q):
    con = pool.get_connection()
    print("PID %d: using connection %s" % (os.getpid(), con))
    c = con.cursor()
    c.execute(q)
    res = c.fetchall()
    con.close()
    return res

def main():
    p = Pool(initializer=init)
    for res in p.map(do_work, ['select * from test']*8):
        print(res)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

或者只使用简单的连接而不是连接池,因为无论如何每个进程一次只有一个连接处于活动状态。
并发使用的连接数隐式受限于multiprocessing.Pool 的大小。

【讨论】:

  • 它会在工作进程启动时为每个工作进程创建一个连接池。根据用例,每个工作人员一个连接可能比每个工作人员一个池更合适,正如我在答案底部所解释的那样。但问题是关于使用多处理连接池,这就是我在示例中保留该池的原因。
  • 正确的方法是使用一个每个进程都可以访问的池,这是 OP 要求的。为每个进程创建一个单独的池是多余的,并且打开了太多的连接,这正是 OP 提到的问题。
  • 使用一个包含多个进程的单个池的唯一方法是拥有一个专用进程,该进程使用队列与它进行所有数据库访问 - 但这将涉及大量的数据提取和解封开销。就像我说的那样,我只使用几个工人,每个工人都有一个连接,这在连接数上与使用具有相同连接数的池相同。但是,如果您认为使用一个池的解决方案会更好,您应该提交自己的答案。
  • 不幸的是,我不确定如何优雅地解决这个问题。我同意,也许最实用的解决方案是每次调用工人时打开和关闭一个新连接。我只是想指出,为每个工作人员设置一个 mysql 池并不能解决这个问题。
  • @AlexanderYau - 因为连接没有明确设计用于多个进程(如本例中),使用该连接的进程需要正确同步对该连接的所有访问才能正常工作,可能是巨大的开销,可能不仅仅是创建一个新的连接。我并不是说这是不可能的,也许存在实现类似功能的数据库连接器......
【解决方案2】:

您创建了多个数据库对象实例。在 mysql.connector.pooling.py 中,pool_name 只是一个属性,可以让您确定它是哪个池。 mysql池中没有映射。

所以,您在def readfile() 中创建多个数据库实例,那么您将拥有多个连接池。

单例在这种情况下很有用。

(我花了几个小时才找到它。在 Tornado 框架中,每个 http get 都会创建一个新的处理程序,这会导致建立一个新的连接。)

【讨论】:

    【解决方案3】:
    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    import time
    import mysql.connector.pooling
    
    
    dbconfig = {
        "host":"127.0.0.1",
        "port":"3306",
        "user":"root",
        "password":"123456",
        "database":"test",
    }
    
    
    class MySQLPool(object):
        """
        create a pool when connect mysql, which will decrease the time spent in 
        request connection, create connection and close connection.
        """
        def __init__(self, host="172.0.0.1", port="3306", user="root",
                     password="123456", database="test", pool_name="mypool",
                     pool_size=3):
            res = {}
            self._host = host
            self._port = port
            self._user = user
            self._password = password
            self._database = database
    
            res["host"] = self._host
            res["port"] = self._port
            res["user"] = self._user
            res["password"] = self._password
            res["database"] = self._database
            self.dbconfig = res
            self.pool = self.create_pool(pool_name=pool_name, pool_size=pool_size)
    
        def create_pool(self, pool_name="mypool", pool_size=3):
            """
            Create a connection pool, after created, the request of connecting 
            MySQL could get a connection from this pool instead of request to 
            create a connection.
            :param pool_name: the name of pool, default is "mypool"
            :param pool_size: the size of pool, default is 3
            :return: connection pool
            """
            pool = mysql.connector.pooling.MySQLConnectionPool(
                pool_name=pool_name,
                pool_size=pool_size,
                pool_reset_session=True,
                **self.dbconfig)
            return pool
    
        def close(self, conn, cursor):
            """
            A method used to close connection of mysql.
            :param conn: 
            :param cursor: 
            :return: 
            """
            cursor.close()
            conn.close()
    
        def execute(self, sql, args=None, commit=False):
            """
            Execute a sql, it could be with args and with out args. The usage is 
            similar with execute() function in module pymysql.
            :param sql: sql clause
            :param args: args need by sql clause
            :param commit: whether to commit
            :return: if commit, return None, else, return result
            """
            # get connection form connection pool instead of create one.
            conn = self.pool.get_connection()
            cursor = conn.cursor()
            if args:
                cursor.execute(sql, args)
            else:
                cursor.execute(sql)
            if commit is True:
                conn.commit()
                self.close(conn, cursor)
                return None
            else:
                res = cursor.fetchall()
                self.close(conn, cursor)
                return res
    
        def executemany(self, sql, args, commit=False):
            """
            Execute with many args. Similar with executemany() function in pymysql.
            args should be a sequence.
            :param sql: sql clause
            :param args: args
            :param commit: commit or not.
            :return: if commit, return None, else, return result
            """
            # get connection form connection pool instead of create one.
            conn = self.pool.get_connection()
            cursor = conn.cursor()
            cursor.executemany(sql, args)
            if commit is True:
                conn.commit()
                self.close(conn, cursor)
                return None
            else:
                res = cursor.fetchall()
                self.close(conn, cursor)
                return res
    
    
    if __name__ == "__main__":
        mysql_pool = MySQLPool(**dbconfig)
        sql = "select * from store WHERE create_time < '2017-06-02'"
        p = Pool()
        for i in range(5):
            p.apply_async(mysql_pool.execute, args=(sql,))
    

    上面的代码一开始就创建了一个连接池,并在execute()中获取连接,一旦连接池创建完成,工作就是保留它,因为池只创建一次,所以会保存每次您想连接到 MySQL 时都需要请求连接。 希望对您有所帮助!

    【讨论】:

    • 如何从不同的文件中调用它?不会创建不同的池实例而不是访问现有的池对象
    • @Mukund 您可以创建一个池对象并从同一个池对象创建不同的连接。
    【解决方案4】:

    如果您要重用由池维护的MySQLConnection 实例,则可能存在同步问题,但只需在工作进程之间共享MySQLConnectionPool 实例并使用通过调用方法get_connection() 检索到的连接就可以了,因为将为每个 MySQLConnection 实例创建一个专用套接字。

    import multiprocessing
    from mysql.connector import pooling
    
    def f(cnxpool: pooling.MySQLConnectionPool) -> None:
        # Dedicate connection instance for each worker process.
        cnx = cnxpool.get_connection()
        ...
    
    if __name__ == '__main__':
        cnxpool = pooling.MySQLConnectionPool(
            pool_name='pool',
            pool_size=2,
        )
        p0 = multiprocessing.Process(target=f, args=(cnxpool,))
        p1 = multiprocessing.Process(target=f, args=(cnxpool,))
        p0.start()
        p1.start()
    

    【讨论】:

      猜你喜欢
      • 2020-02-19
      • 2013-07-31
      • 1970-01-01
      • 2021-07-24
      • 2020-08-14
      • 2016-11-10
      • 2018-12-04
      • 2014-07-25
      • 2017-04-18
      相关资源
      最近更新 更多