【问题标题】:Speeding up pandas.DataFrame.to_sql with fast_executemany of pyODBC使用 pyODBC 的 fast_executemany 加速 pandas.DataFrame.to_sql
【发布时间】:2018-06-08 22:43:38
【问题描述】:

我想向运行 MS SQL 的远程服务器发送一个大的 pandas.DataFrame。我现在的做法是将data_frame 对象转换为元组列表,然后使用pyODBC 的executemany() 函数将其发送出去。它是这样的:

 import pyodbc as pdb

 list_of_tuples = convert_df(data_frame)

 connection = pdb.connect(cnxn_str)

 cursor = connection.cursor()
 cursor.fast_executemany = True
 cursor.executemany(sql_statement, list_of_tuples)
 connection.commit()

 cursor.close()
 connection.close()

然后我开始怀疑使用data_frame.to_sql() 方法是否可以加快速度(或者至少更具可读性)。我想出了以下解决方案:

 import sqlalchemy as sa

 engine = sa.create_engine("mssql+pyodbc:///?odbc_connect=%s" % cnxn_str)
 data_frame.to_sql(table_name, engine, index=False)

现在代码更具可读性,但上传速度至少慢了 150 倍...

在使用 SQLAlchemy 时有没有办法翻转fast_executemany

我正在使用 pandas-0.20.3、pyODBC-4.0.21 和 sqlalchemy-1.1.13。

【问题讨论】:

    标签: python sqlalchemy pyodbc pandas-to-sql


    【解决方案1】:

    在联系了 SQLAlchemy 的开发者之后,出现了解决这个问题的方法。非常感谢他们的出色工作!

    必须使用游标执行事件并检查executemany 标志是否已被引发。如果确实如此,请打开fast_executemany 选项。例如:

    from sqlalchemy import event
    
    @event.listens_for(engine, 'before_cursor_execute')
    def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
        if executemany:
            cursor.fast_executemany = True
    

    更多关于执行事件的信息可以在here找到。


    更新:pyodbc 中添加了对fast_executemany 的支持SQLAlchemy 1.3.0,因此不再需要此hack。

    【讨论】:

    • 非常感谢您为此付出的努力。为了清楚起见,应该在实例化 SQLAlchemy 引擎之前声明这个装饰器和函数?
    • 不客气。我在类的构造函数中实例化引擎后立即声明它。
    • 所以这消除了对 pyodbc 特定连接代码的需要?只需要在这个函数之后调用to_sql()
    • 我尝试在函数之后直接调用to_sql,但并没有加快任何速度
    • @J.K. - 请考虑更新您的答案,以提及 2019 年 3 月 4 日发布的 SQLAlchemy 1.3.0 现在支持 mssql+pyodbc 方言的 engine = create_engine(sqlalchemy_url, fast_executemany=True)。即,不再需要定义函数并使用@event.listens_for(engine, 'before_cursor_execute')。谢谢。
    【解决方案2】:

    编辑(2019-03-08): Gord Thompson 在下面评论了来自 sqlalchemy 更新日志的好消息:自 2019-03-04 发布的 SQLAlchemy 1.3.0 以来,sqlalchemy现在支持engine = create_engine(sqlalchemy_url, fast_executemany=True)mssql+pyodbc 方言。即,不再需要定义一个函数并使用@event.listens_for(engine, 'before_cursor_execute') 这意味着可以删除下面的函数,只需要在 create_engine 语句中设置标志 - 并且仍然保持加速。 p>

    原帖:

    刚刚注册了一个帐户来发布这个。我想在上面的帖子下发表评论,因为它是对已经提供的答案的跟进。上面的解决方案适用于我在基于 Ubuntu 的安装中写入的 Microsft SQL 存储上的版本 17 SQL 驱动程序。

    我用来显着加快速度的完整代码(加速 > 100 倍)如下。这是一个交钥匙 sn-p,前提是您使用相关详细信息更改连接字符串。对于上面的海报,非常感谢您的解决方案,因为我已经为此寻找了相当长的时间。

    import pandas as pd
    import numpy as np
    import time
    from sqlalchemy import create_engine, event
    from urllib.parse import quote_plus
    
    
    conn =  "DRIVER={ODBC Driver 17 for SQL Server};SERVER=IP_ADDRESS;DATABASE=DataLake;UID=USER;PWD=PASS"
    quoted = quote_plus(conn)
    new_con = 'mssql+pyodbc:///?odbc_connect={}'.format(quoted)
    engine = create_engine(new_con)
    
    
    @event.listens_for(engine, 'before_cursor_execute')
    def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
        print("FUNC call")
        if executemany:
            cursor.fast_executemany = True
    
    
    table_name = 'fast_executemany_test'
    df = pd.DataFrame(np.random.random((10**4, 100)))
    
    
    s = time.time()
    df.to_sql(table_name, engine, if_exists = 'replace', chunksize = None)
    print(time.time() - s)
    

    基于下面的 cmets,我想花一些时间来解释一下 pandas to_sql 实现和查询处理方式的一些限制。有两件事可能会导致 MemoryError 被提高 afaik:

    1) 假设您正在写入远程 SQL 存储。当您尝试使用 to_sql 方法编写大熊猫数据帧时,它会将整个数据帧转换为值列表。这种转换比原始 DataFrame 占用更多的 RAM(最重要的是,旧的 DataFrame 仍然存在于 RAM 中)。此列表提供给您的 ODBC 连接器的最终executemany 调用。我认为 ODBC 连接器在处理如此大的查询时遇到了一些麻烦。解决此问题的一种方法是为 to_sql 方法提供一个 chunksize 参数(10**5 似乎是最佳的,在 Azure 的 2 CPU 7GB ram MSSQL 存储应用程序上提供大约 600 mbit/s(!)的写入速度 - 可以不推荐 Azure 顺便说一句)。因此,第一个限制,即查询大小,可以通过提供 chunksize 参数来规避。但是,这不会让您编写大小为 10**7 或更大的数据帧(至少在我正在使用的具有 ~55GB RAM 的 VM 上不是),问题 nr 2。

    这可以通过用np.split(10**6 大小的 DataFrame 块)分解 DataFrame 来规避。这些可以迭代地写掉。当我为 pandas 核心中的 to_sql 方法准备好解决方案时,我将尝试发出拉取请求,这样您就不必每次都进行预先分解。无论如何,我最终编写了一个与以下类似(不是交钥匙)的功能:

    import pandas as pd
    import numpy as np
    
    def write_df_to_sql(df, **kwargs):
        chunks = np.split(df, df.shape()[0] / 10**6)
        for chunk in chunks:
            chunk.to_sql(**kwargs)
        return True
    

    以上sn-p更完整的例子可以看这里:https://gitlab.com/timelord/timelord/blob/master/timelord/utils/connector.py

    这是我编写的一个类,它包含了补丁,并减轻了与 SQL 建立连接所带来的一些必要开销。还是要写一些文档。我还计划将补丁贡献给 pandas 本身,但还没有找到一个好的方法。

    我希望这会有所帮助。

    【讨论】:

    • 我认为这不相关,因为最初的问题是关于方法 to_sql 的加速。您现在正在询问同一方法中的参数错误,该错误与原始问题不再相关 - afaik。只是试图遵守我通常看到的 SO 规范。关于您现在提供的额外信息,可能会引发错误,因为已经存在的表大小不同,因此无法附加到(类型错误)?另外,我提供的最后一个代码 sn-p 用于说明目的,您可能需要对其进行一些更改。
    • 不知道为什么我以前没有分享过这个,但这是我经常用于将数据帧进出 SQL 数据库的类:gitlab.com/timelord/timelord/blob/master/timelord/utils/…享受!
    • @erickfis 我已经用一个合适的例子更新了这个类。请注意,并非每个数据库都将使用相同的驱动程序,因此在使用此类时会引发错误。不使用它的示例数据库是 PostgreSQL。我还没有找到一种将数据插入 PSQL 的快速方法。仍然像这样使用此类的一种方法是通过调用显式关闭开关:con._init_engine(SET_FAST_EXECUTEMANY_SWITCH=False) 在初始化类之后。祝你好运。
    • @hetspookjee - 由于这是迄今为止最受欢迎的答案,请考虑更新它以提及 SQLAlchemy 1.3.0,于 2019 年 3 月 4 日发布,现在支持 engine = create_engine(sqlalchemy_url, fast_executemany=True)mssql+pyodbc方言。即,不再需要定义函数并使用@event.listens_for(engine, 'before_cursor_execute')。谢谢。
    • 感谢 Gord Thompson 的更新!我已将您的评论置顶,并从我的帖子中制作了一篇社区 wiki 文章以供将来更新。
    【解决方案3】:

    我只是想把这个完整的例子作为一个额外的高性能选项发布给那些可以使用新的 turbodbc 库的人:http://turbodbc.readthedocs.io/en/latest/

    pandas .to_sql() 之间显然有很多选择,通过 sqlalchemy 触发 fast_executemany,直接使用 pyodbc 处理元组/列表/等,甚至尝试使用平面文件进行 BULK UPLOAD。

    希望随着功能在当前 pandas 项目中的发展或未来包括 turbodbc 集成等功能,以下内容可能会让生活变得更加愉快。

    import pandas as pd
    import numpy as np
    from turbodbc import connect, make_options
    from io import StringIO
    
    test_data = '''id,transaction_dt,units,measures
                   1,2018-01-01,4,30.5
                   1,2018-01-03,4,26.3
                   2,2018-01-01,3,12.7
                   2,2018-01-03,3,8.8'''
    
    df_test = pd.read_csv(StringIO(test_data), sep=',')
    df_test['transaction_dt'] = pd.to_datetime(df_test['transaction_dt'])
    
    options = make_options(parameter_sets_to_buffer=1000)
    conn = connect(driver='{SQL Server}', server='server_nm', database='db_nm', turbodbc_options=options)
    
    test_query = '''DROP TABLE IF EXISTS [db_name].[schema].[test]
    
                    CREATE TABLE [db_name].[schema].[test]
                    (
                        id int NULL,
                        transaction_dt datetime NULL,
                        units int NULL,
                        measures float NULL
                    )
    
                    INSERT INTO [db_name].[schema].[test] (id,transaction_dt,units,measures)
                    VALUES (?,?,?,?) '''
    
    cursor.executemanycolumns(test_query, [df_test['id'].values, df_test['transaction_dt'].values, df_test['units'].values, df_test['measures'].values]
    

    turbodbc 在许多用例(尤其是 numpy 数组)中应该非常快。请注意将底层 numpy 数组从数据框列作为参数直接传递给查询是多么简单。我也相信这有助于防止创建过度消耗内存的中间对象。希望这有帮助!

    【讨论】:

    • 在接下来的几天里试试这个,然后我会带着我的发现回来
    • @erickfis 这个投票对你有帮助吗?很高兴在这里听到您的发现
    • 嗨皮兰德!我还没有时间尝试,这里很忙。现在我正在使用公司的工具来获取数据。但是对于下一个项目,我非常需要这个,以便在 sql server 上摄取大量数据。我看到的主要缺点是我的 dfs 每个有 240 列。使用 pd.to_sql 时,我不需要担心每一列。再说一次, pd.to_sql 真的很慢,到了令人望而却步的地步。使用 turbodbc 可能是我的解决方案,但必须手动输入这 240 列中的每一列对我来说似乎不是最佳选择(因为要摄取很多不同的 df)
    • 我成功了:太酷了!让我非常兴奋,以至于我在我的 github 上写了一个“博客”:github
    • @erickfis 太好了!我很高兴您最终发现它对您的需求是值得的,并感谢您链接您的精彩演示帖子。它应该有助于推广这个答案,并将 turbodbc 项目的形象提升给寻求解决方案的人们。
    【解决方案4】:

    似乎 Pandas 0.23.0 和 0.24.0 use multi values inserts 与 PyODBC,这阻止了快速执行的帮助 - 每个块发出单个 INSERT ... VALUES ... 语句。多值插入块是对旧的慢速executemany 默认值的改进,但至少在简单的测试中,快速executemany 方法仍然占主导地位,更不用说不需要手动chunksize 计算,就像多值插入所需要的那样。如果以后没有提供配置选项,可以通过monkeypatching来强制旧行为:

    import pandas.io.sql
    
    def insert_statement(self, data, conn):
        return self.table.insert(), data
    
    pandas.io.sql.SQLTable.insert_statement = insert_statement
    

    未来就在这里,至少在master 分支中,插入方法可以使用to_sql() 的关键字参数method= 来控制。它默认为None,它强制执行executemany 方法。传递method='multi' 导致使用多值插入。它甚至可以用于实现 DBMS 特定的方法,例如 Postgresql COPY

    【讨论】:

    • pandas 开发人员在这个问题上反复讨论了一段时间,但最终他们似乎放弃了多行插入方法,至少对于mssql+pyodbc SQLAlchemy 引擎来说是这样。 pandas 0.23.4 确实让 fast_executemany 做它的事。
    • 目前的情况还没有查到,但是在0.24.0版本又放回去了。编辑:它至少在master 分支中仍然存在,但它现在是可控的:github.com/pandas-dev/pandas/blob/master/pandas/io/sql.py#L1157。似乎通过 to_sql(..., method=None) 应该强制执行 executemany 方法。
    • ...而None 是默认值。
    【解决方案5】:

    正如@Pylander 指出的那样

    到目前为止,Turbodbc 是数据摄取的最佳选择!

    我对此感到非常兴奋,以至于我在我的 github 和 medium 上写了一个“博客”: 请查看https://medium.com/@erickfis/etl-process-with-turbodbc-1d19ed71510e

    一个工作示例并与 pandas.to_sql 进行比较

    长话短说,

    使用 turbodbc 我在 3 秒内有 10000 行(77 列)

    使用 pandas.to_sql 我在 198 秒内得到了相同的 10000 行(77 列)...

    这是我正在做的全部细节

    进口:

    import sqlalchemy
    import pandas as pd
    import numpy as np
    import turbodbc
    import time
    

    加载并处理一些数据 - 用我的 sample.pkl 代替你的:

    df = pd.read_pickle('sample.pkl')
    
    df.columns = df.columns.str.strip()  # remove white spaces around column names
    df = df.applymap(str.strip) # remove white spaces around values
    df = df.replace('', np.nan)  # map nans, to drop NAs rows and columns later
    df = df.dropna(how='all', axis=0)  # remove rows containing only NAs
    df = df.dropna(how='all', axis=1)  # remove columns containing only NAs
    df = df.replace(np.nan, 'NA')  # turbodbc hates null values...
    

    使用 sqlAlchemy 创建表

    不幸的是,turbodbc 需要大量开销和大量 sql 手工劳动,用于创建表并在其上插入数据。

    幸运的是,Python 是纯粹的快乐,我们可以自动化编写 sql 代码的过程。

    第一步是创建将接收我们的数据的表。但是,如果您的表具有多个列,则手动编写 sql 代码创建表可能会出现问题。就我而言,表格通常有 240 列!

    这就是 sqlAlchemy 和 pandas 仍然可以帮助我们的地方:pandas 不适合写入大量行(本例中为 10000 行),但是表头只有 6 行呢?通过这种方式,我们可以自动执行创建表的过程。

    创建 sqlAlchemy 连接:

    mydb = 'someDB'
    
    def make_con(db):
        """Connect to a specified db."""
        database_connection = sqlalchemy.create_engine(
            'mssql+pymssql://{0}:{1}@{2}/{3}'.format(
                myuser, mypassword,
                myhost, db
                )
            )
        return database_connection
    
    pd_connection = make_con(mydb)
    

    在 SQL Server 上创建表

    使用 pandas + sqlAlchemy,但只是为前面提到的 turbodbc 准备空间。请注意此处的 df.head():我们使用 pandas + sqlAlchemy 仅插入 6 行数据。这将运行得非常快,并且正在自动创建表。

    table = 'testing'
    df.head().to_sql(table, con=pd_connection, index=False)
    

    现在桌子已经就位了,让我们在这里认真点。

    Turbodbc 连接:

    def turbo_conn(mydb):
        """Connect to a specified db - turbo."""
        database_connection = turbodbc.connect(
                                                driver='ODBC Driver 17 for SQL Server',
                                                server=myhost,
                                                database=mydb,
                                                uid=myuser,
                                                pwd=mypassword
                                            )
        return database_connection
    

    为 turbodbc 准备 sql 命令和数据。让我们创造性地自动创建代码:

    def turbo_write(mydb, df, table):
        """Use turbodbc to insert data into sql."""
        start = time.time()
        # preparing columns
        colunas = '('
        colunas += ', '.join(df.columns)
        colunas += ')'
    
        # preparing value place holders
        val_place_holder = ['?' for col in df.columns]
        sql_val = '('
        sql_val += ', '.join(val_place_holder)
        sql_val += ')'
    
        # writing sql query for turbodbc
        sql = f"""
        INSERT INTO {mydb}.dbo.{table} {colunas}
        VALUES {sql_val}
        """
    
        # writing array of values for turbodbc
        valores_df = [df[col].values for col in df.columns]
    
        # cleans the previous head insert
        with connection.cursor() as cursor:
            cursor.execute(f"delete from {mydb}.dbo.{table}")
            connection.commit()
    
        # inserts data, for real
        with connection.cursor() as cursor:
            try:
                cursor.executemanycolumns(sql, valores_df)
                connection.commit()
            except Exception:
                connection.rollback()
                print('something went wrong')
    
        stop = time.time() - start
        return print(f'finished in {stop} seconds')
    

    使用 turbodbc 写入数据 - 我在 3 秒内有 10000 行(77 列):

    turbo_write(mydb, df.sample(10000), table)
    

    Pandas 方法比较 - 我在 198 秒内得到了相同的 10000 行(77 列)……

    table = 'pd_testing'
    
    def pandas_comparisson(df, table):
        """Load data using pandas."""
        start = time.time()
        df.to_sql(table, con=pd_connection, index=False)
        stop = time.time() - start
        return print(f'finished in {stop} seconds')
    
    pandas_comparisson(df.sample(10000), table)
    

    环境条件

    Python 3.6.7 :: Anaconda, Inc.
    TURBODBC version ‘3.0.0’
    sqlAlchemy version ‘1.2.12’
    pandas version ‘0.23.4’
    Microsoft SQL Server 2014
    user with bulk operations privileges
    

    请检查https://erickfis.github.io/loose-code/ 以获取此代码中的更新!

    【讨论】:

    • 我也发现 pandas 很慢,但对于一个项目,我使用不同的方法解决了它。我有多个文件(13 列)中的数据,但总共有 100 万行。相反,我将 MySQL INFILE 与本地存储的文件一起使用。从 python 调用它,使用线程。我能够在大约 20 秒内导入 100 万行。
    【解决方案6】:

    SQL Server INSERT 性能:pyodbc 与 turbodbc

    当使用to_sql 将pandas DataFrame 上传到SQL Server 时,turbodbc 肯定会比没有fast_executemany 的pyodbc 快。但是,为 pyodbc 启用 fast_executemany 后,两种方法的性能基本相同。

    测试环境:

    [venv1_pyodbc]
    pyodbc 2.0.25

    [venv2_turbodbc]
    turbodbc 3.0.0
    sqlalchemy-turbodbc 0.1.0

    [两者通用]
    Windows 上的 Python 3.6.4 64 位
    SQLAlchemy 1.3.0b1
    熊猫 0.23.4
    numpy 1.15.4

    测试代码:

    # for pyodbc
    engine = create_engine('mssql+pyodbc://sa:whatever@SQL_panorama', fast_executemany=True)
    # for turbodbc
    # engine = create_engine('mssql+turbodbc://sa:whatever@SQL_panorama')
    
    # test data
    num_rows = 10000
    num_cols = 100
    df = pd.DataFrame(
        [[f'row{x:04}col{y:03}' for y in range(num_cols)] for x in range(num_rows)],
        columns=[f'col{y:03}' for y in range(num_cols)]
    )
    
    t0 = time.time()
    df.to_sql("sqlalchemy_test", engine, if_exists='replace', index=None)
    print(f"pandas wrote {num_rows} rows in {(time.time() - t0):0.1f} seconds")
    

    针对每个环境运行了十二 (12) 次测试,丢弃了每个环境的最佳和最差时间。结果(以秒为单位):

       rank  pyodbc  turbodbc
       ----  ------  --------
          1    22.8      27.5
          2    23.4      28.1
          3    24.6      28.2
          4    25.2      28.5
          5    25.7      29.3
          6    26.9      29.9
          7    27.0      31.4
          8    30.1      32.1
          9    33.6      32.5
         10    39.8      32.9
       ----  ------  --------
    average    27.9      30.0
    

    【讨论】:

      【解决方案7】:

      我遇到了同样的问题,但使用的是 PostgreSQL。他们现在刚刚发布 pandas 版本 0.24.0to_sql 函数中有一个名为 method 的新参数解决了我的问题。

      from sqlalchemy import create_engine
      
      engine = create_engine(your_options)
      data_frame.to_sql(table_name, engine, method="multi")
      

      对我来说,上传速度快了 100 倍。 如果要发送大量数据,我还建议设置 chunksize 参数。

      【讨论】:

      【解决方案8】:

      只是想添加到@J.K. 的答案。

      如果您使用这种方法:

      @event.listens_for(engine, 'before_cursor_execute')
      def receive_before_cursor_execute(conn, cursor, statement, params, context, executemany):
          if executemany:
              cursor.fast_executemany = True
      

      你得到这个错误:

      "sqlalchemy.exc.DBAPIError: (pyodbc.Error) ('HY010', '[HY010] [Microsoft][SQL Server Native Client 11.0]函数序列错误(0) (SQLParamData)') [SQL: 'INSERT INTO ... (...) VALUES (?, ?)'] [参数:((...,...),(...,...)](此错误的背景: http://sqlalche.me/e/dbapi)"

      像这样编码你的字符串值:'yourStringValue'.encode('ascii')

      这将解决您的问题。

      【讨论】:

        【解决方案9】:

        我只是修改了引擎行,这有助于我将插入速度提高 100 倍。

        旧代码 -

        import json
        import maya
        import time
        import pandas
        import pyodbc
        import pandas as pd
        from sqlalchemy import create_engine
        
        retry_count = 0
        retry_flag = True
        
        hostInfoDf = pandas.read_excel('test.xlsx', sheet_name='test')
        print("Read Ok")
        
        engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")
        
        while retry_flag and retry_count < 5:
          try:
            df.to_sql("table_name",con=engine,if_exists="replace",index=False,chunksize=5000,schema="dbo")
            retry_flag = False
          except:
            retry_count = retry_count + 1
            time.sleep(30)
        

        修改后的引擎线-

        来自-

        engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server")
        

        到-

        engine = create_engine("mssql+pyodbc://server_name/db_name?trusted_connection=yes&driver=ODBC+Driver+17+for+SQL+Server", fast_executemany=True)
        

        问我任何 Query 相关的 python 到 SQL 的连接,我很乐意帮助你。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2019-09-21
          • 1970-01-01
          • 2023-03-30
          • 2015-06-20
          相关资源
          最近更新 更多