【问题标题】:Bulk Insert A Pandas DataFrame Using SQLAlchemy使用 SQLAlchemy 批量插入 Pandas 数据帧
【发布时间】:2015-11-06 23:44:49
【问题描述】:

我有一些相当大的 pandas DataFrame,我想使用新的批量 SQL 映射通过 SQL Alchemy 将它们上传到 Microsoft SQL Server。 pandas.to_sql 方法虽然不错,但速度很慢。

我在编写代码时遇到问题...

我希望能够向这个函数传递一个我称之为table 的pandas DataFrame、一个我称之为schema 的模式名称以及一个我称之为name 的表名。理想情况下,该函数将 1.) 如果表已存在,则删除该表。 2.)创建一个新表 3.)创建一个映射器和 4.)使用映射器和熊猫数据进行批量插入。我被困在第 3 部分。

这是我的(诚然粗略)代码。我正在为如何让映射器功能与我的主键一起工作而苦苦挣扎。我真的不需要主键,但映射器功能需要它。

感谢您的见解。

from sqlalchemy import create_engine Table, Column, MetaData
from sqlalchemy.orm import mapper, create_session
from sqlalchemy.ext.declarative import declarative_base
from pandas.io.sql import SQLTable, SQLDatabase

def bulk_upload(table, schema, name):
    e = create_engine('mssql+pyodbc://MYDB')
    s = create_session(bind=e)
    m = MetaData(bind=e,reflect=True,schema=schema)
    Base = declarative_base(bind=e,metadata=m)
    t = Table(name,m)
    m.remove(t)
    t.drop(checkfirst=True)
    sqld = SQLDatabase(e, schema=schema,meta=m)
    sqlt = SQLTable(name, sqld, table).table
    sqlt.metadata = m
    m.create_all(bind=e,tables=[sqlt])    
    class MyClass(Base):
        return
    mapper(MyClass, sqlt)    

    s.bulk_insert_mappings(MyClass, table.to_dict(orient='records'))
    return

【问题讨论】:

  • 看来您正在自己重新创建to_sql 函数,我怀疑这会更快。将数据写入 SQL 的瓶颈主要在于 python 驱动程序(在您的情况下为pyobdc),这是您在上述实现中无法避免的事情。此外,to_sql 不使用 ORM,即使在使用批量插入时,它也被认为比 CORE sqlalchemy 慢(docs.sqlalchemy.org/en/latest/faq/…
  • 此外,如果to_sql 太慢,并且您无法改进它(例如通过调整连接参数、使用的驱动程序(例如 pymssql)、互联网速度、通过删除表上的约束等),更快的替代方法是将数据写入 csv,然后将其加载到 SQL 表中。
  • @joris 谢谢。看来这里列出的“批量操作”有点用词不当。 docs.sqlalchemy.org/en/rel_1_0/orm/… 我真正需要做的是将熊猫数据文件输出到文本文件并像这样编写 BULK INSERT 操作... stackoverflow.com/questions/29638136/…
  • 是的,但那是为了提高 sqlalchemy ORM 的速度,它比核心 sqlalchemy 有更多的功能。但是 pandas to_sql 根本不使用 ORM,正如我之前所说,实际上已经在进行批量插入。
  • @joris 好吧,我走这条路的原因是我可以在 SQL Server 上运行 'BULK INSERT dbo.MyTable FROM \\fileserver\folder\doc.txt' 并且性能是伟大的。我在想的是,当 BULK INSERT 语句使用“VALUES”而不是“FROM”时,这就是真正的性能损失所在。换句话说,从 sql server 到文件服务器的连接比从我的虚拟机到 SQL Server 的连接要好。谢谢。

标签: python pandas sqlalchemy


【解决方案1】:

我遇到了类似的问题,pd.to_sql 需要数小时才能上传数据。下面的代码批量在几秒钟内插入了相同的数据。

from sqlalchemy import create_engine
import psycopg2 as pg
#load python script that batch loads pandas df to sql
import cStringIO

address = 'postgresql://<username>:<pswd>@<host>:<port>/<database>'
engine = create_engine(address)
connection = engine.raw_connection()
cursor = connection.cursor()

#df is the dataframe containing an index and the columns "Event" and "Day"
#create Index column to use as primary key
df.reset_index(inplace=True)
df.rename(columns={'index':'Index'}, inplace =True)

#create the table but first drop if it already exists
command = '''DROP TABLE IF EXISTS localytics_app2;
CREATE TABLE localytics_app2
(
"Index" serial primary key,
"Event" text,
"Day" timestamp without time zone,
);'''
cursor.execute(command)
connection.commit()

#stream the data using 'to_csv' and StringIO(); then use sql's 'copy_from' function
output = cStringIO.StringIO()
#ignore the index
df.to_csv(output, sep='\t', header=False, index=False)
#jump to start of stream
output.seek(0)
contents = output.getvalue()
cur = connection.cursor()
#null values become ''
cur.copy_from(output, 'localytics_app2', null="")    
connection.commit()
cur.close()

【讨论】:

  • 看起来很有趣。用 Oracle DB 尝试过,它说 cx_Oracle.Cursor 对象没有属性“copy_from”。 copy_from 方法似乎是 postgres 的东西。关于与 DB 无关的方法有什么想法吗?
  • 这是一个不错的内存解决方案。只有一点是contents 变量没有被使用。不妨放弃它,因为它会读取整个字符串缓冲区,这取决于数据库的大小。没有经过测试,它工作正常。
  • cStringIO 已被 Python3 弃用。如果使用python3,可以使用:import io;输出 = io.StringIO()
  • 注意 - 我认为这不适用于红移。更好/最快将数据推送到 s3,然后从那里复制到 redshift。
  • 这也不适用于 SQL Server。事实上,这个解决方案唯一适用的平台是 Postgres(由于 copy_from),它可能会帮助除了提问者之外的人,但提问者非常明确地询问了 SQL Server。
【解决方案2】:

到那时可能已经回答了,但我通过在此站点上整理不同的答案并与 SQLAlchemy 的文档保持一致找到了解决方案。

  1. 表需要已经存在于db1中;使用 auto_increment 设置索引。
  2. Current 类需要与 CSV 中导入的数据框和 db1 中的表对齐。

希望这对来到这里并希望快速混合 Panda 和 SQLAlchemy 的人有所帮助。

from urllib import quote_plus as urlquote
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import pandas as pd


# Set up of the engine to connect to the database
# the urlquote is used for passing the password which might contain special characters such as "/"
engine = create_engine('mysql://root:%s@localhost/db1' % urlquote('weirdPassword*withsp€cialcharacters'), echo=False)
conn = engine.connect()
Base = declarative_base()

#Declaration of the class in order to write into the database. This structure is standard and should align with SQLAlchemy's doc.
class Current(Base):
    __tablename__ = 'tableName'

    id = Column(Integer, primary_key=True)
    Date = Column(String(500))
    Type = Column(String(500))
    Value = Column(Numeric())

    def __repr__(self):
        return "(id='%s', Date='%s', Type='%s', Value='%s')" % (self.id, self.Date, self.Type, self.Value)

# Set up of the table in db and the file to import
fileToRead = 'file.csv'
tableToWriteTo = 'tableName'

# Panda to create a lovely dataframe
df_to_be_written = pd.read_csv(fileToRead)
# The orient='records' is the key of this, it allows to align with the format mentioned in the doc to insert in bulks.
listToWrite = df_to_be_written.to_dict(orient='records')

metadata = sqlalchemy.schema.MetaData(bind=engine,reflect=True)
table = sqlalchemy.Table(tableToWriteTo, metadata, autoload=True)

# Open the session
Session = sessionmaker(bind=engine)
session = Session()

# Inser the dataframe into the database in one bulk
conn.execute(table.insert(), listToWrite)

# Commit the changes
session.commit()

# Close the session
session.close()

【讨论】:

  • 我发现这篇 SQLAlchemy 文章对于提高插入速度非常有用:docs.sqlalchemy.org/en/latest/faq/…
  • 感谢您,您让我的生活更轻松。你能解释一下def __repr__(self)的意义吗?
  • class Current 的定义到底在哪里使用?实际上似乎没有在这里的任何地方被取消引用。
  • @ijoseph 我认为Current 类是Base 的子类,它在MetaData 对象中记录其所有ORM 定义,该对象用于创建表模式。 docs
【解决方案3】:

由于这是一个 I/O 繁重的工作负载,您还可以通过multiprocessing.dummy 使用 python 线程模块。这为我加快了速度:

import math
from multiprocessing.dummy import Pool as ThreadPool

...

def insert_df(df, *args, **kwargs):
    nworkers = 4

    chunksize = math.floor(df.shape[0] / nworkers)
    chunks = [(chunksize * i, (chunksize * i) + chunksize) for i in range(nworkers)]
    chunks.append((chunksize * nworkers, df.shape[0]))
    pool = ThreadPool(nworkers)

    def worker(chunk):
        i, j = chunk
        df.iloc[i:j, :].to_sql(*args, **kwargs)

    pool.map(worker, chunks)
    pool.close()
    pool.join()


....

insert_df(df, "foo_bar", engine, if_exists='append')

【讨论】:

    【解决方案4】:

    基于@ansonw 的回答:

    def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
        # Create Table
        df[:0].to_sql(table, engine, if_exists=if_exists)
    
        # Prepare data
        output = cStringIO.StringIO()
        df.to_csv(output, sep=sep, header=False, encoding=encoding)
        output.seek(0)
    
        # Insert data
        connection = engine.raw_connection()
        cursor = connection.cursor()
        cursor.copy_from(output, table, sep=sep, null='')
        connection.commit()
        cursor.close()
    

    我在 5 秒而不是 4 分钟内插入 200000 行

    【讨论】:

    • 我没有投反对票,但这看起来不像是根据需要使用 pandas 的解决方案:多进程 + pandas + sqlalchemy。通常在摄取期间,尤其是对于较大的数据集,会有一个临时位置来将数据存储在数据库中,然后在插入/更新之前处理该数据(删除/回填)。
    • 我认为这可以重构以支持数据分块(Pandas DataFrame.to_sql 建议这样做,但它不起作用)。这意味着不需要一次性渲染整个 CSV 表单,但我认为这是该线程中最好的解决方案!
    • 不幸的是,从那时起,方法 copy_from() 已被弃用。
    【解决方案5】:

    下面我的 postgres 特定解决方案使用您的 pandas 数据框自动创建数据库表,并使用 postgres COPY my_table FROM ... 执行快速批量插入

    import io
    
    import pandas as pd
    from sqlalchemy import create_engine
    
    def write_to_table(df, db_engine, schema, table_name, if_exists='fail'):
        string_data_io = io.StringIO()
        df.to_csv(string_data_io, sep='|', index=False)
        pd_sql_engine = pd.io.sql.pandasSQL_builder(db_engine, schema=schema)
        table = pd.io.sql.SQLTable(table_name, pd_sql_engine, frame=df,
                                   index=False, if_exists=if_exists, schema=schema)
        table.create()
        string_data_io.seek(0)
        string_data_io.readline()  # remove header
        with db_engine.connect() as connection:
            with connection.connection.cursor() as cursor:
                copy_cmd = "COPY %s.%s FROM STDIN HEADER DELIMITER '|' CSV" % (schema, table_name)
                cursor.copy_expert(copy_cmd, string_data_io)
            connection.connection.commit()
    

    【讨论】:

      【解决方案6】:

      对于遇到此问题并将目标数据库作为 Redshift 的任何人,请注意 Redshift 没有实现完整的 Postgres 命令集,因此使用 Postgres 的 COPY FROMcopy_from() 的某些答案将不起作用。 psycopg2.ProgrammingError: syntax error at or near "stdin" error when trying to copy_from redshift

      加快向 Redshift 插入的解决方案是使用文件摄取或 Odo。

      参考:
      关于大堂 http://odo.pydata.org/en/latest/perf.html
      使用 Redshift 进行 Odo
      https://github.com/blaze/odo/blob/master/docs/source/aws.rst
      Redshift COPY(来自 S3 文件)
      https://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html

      【讨论】:

      • 欢迎来到 Stack Overflow!虽然链接是分享知识的好方法,但如果它们在未来被破坏,它们将无法真正回答问题。将回答问题的链接的基本内容添加到您的答案中。如果内容太复杂或太大而无法在此处放置,请描述所提出解决方案的总体思路。请记住始终保留指向原始解决方案网站的链接引用。见:How do I write a good answer?
      【解决方案7】:

      这对我使用 cx_Oracle 和 SQLALchemy 连接到 Oracle 数据库很有用

      import sqlalchemy
      import cx_Oracle
      from sqlalchemy import create_engine
      from sqlalchemy.ext.declarative import declarative_base
      from sqlalchemy import Column, String
      from sqlalchemy.orm import sessionmaker
      import pandas as pd
      
      # credentials
      username = "username"
      password = "password"
      connectStr = "connection:/string"
      tableName = "tablename"
      
      t0 = time.time()
      
      # connection
      dsn = cx_Oracle.makedsn('host','port',service_name='servicename')
      
      Base = declarative_base()
      
      class LANDMANMINERAL(Base):
          __tablename__ = 'tablename'
      
          DOCUMENTNUM = Column(String(500), primary_key=True)
          DOCUMENTTYPE = Column(String(500))
          FILENUM = Column(String(500))
          LEASEPAYOR = Column(String(500))
          LEASESTATUS = Column(String(500))
          PROSPECT = Column(String(500))
          SPLIT = Column(String(500))
          SPLITSTATUS = Column(String(500))
      
      engine = create_engine('oracle+cx_oracle://%s:%s@%s' % (username, password, dsn))
      conn = engine.connect()  
      
      Base.metadata.bind = engine
      
      # Creating the session
      
      DBSession = sessionmaker(bind=engine)
      
      session = DBSession()
      
      # Bulk insertion
      data = pd.read_csv('data.csv')
      lists = data.to_dict(orient='records')
      
      
      table = sqlalchemy.Table('landmanmineral', Base.metadata, autoreload=True)
      conn.execute(table.insert(), lists)
      
      session.commit()
      
      session.close() 
      
      print("time taken %8.8f seconds" % (time.time() - t0) )
      

      【讨论】:

        【解决方案8】:

        对于像我这样尝试实施上述解决方案的人:

        Pandas 0.24.0 现在有 to_sql 和 chunksize 和 method='multi' 选项,可以批量插入...

        【讨论】:

          【解决方案9】:

          这里是简单的方法

          .

          下载用于 SQL 数据库连接的驱动程序

          对于 Linux 和 Mac 操作系统:

          https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server?view=sql-server-2017

          对于 Windows:

          https://www.microsoft.com/en-us/download/details.aspx?id=56567

          创建连接

          from sqlalchemy import create_engine 
          import urllib
          server = '*****'
          database = '********'
          username = '**********'
          password = '*********'
          
          params = urllib.parse.quote_plus(
          'DRIVER={ODBC Driver 17 for SQL Server};'+ 
          'SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password) 
          
          engine = create_engine("mssql+pyodbc:///?odbc_connect=%s" % params) 
          
          #Checking Connection 
          connected = pd.io.sql._is_sqlalchemy_connectable(engine)
          
          print(connected)   #Output is True if connection established successfully
          

          数据插入

          df.to_sql('Table_Name', con=engine, if_exists='append', index=False)
          
          
          """
          if_exists: {'fail', 'replace', 'append'}, default 'fail'
               fail: If table exists, do nothing.
               replace: If table exists, drop it, recreate it, and insert data.
               append: If table exists, insert data. Create if does not exist.
          """
          

          如果有很多记录

          # limit based on sp_prepexec parameter count
          tsql_chunksize = 2097 // len(bd_pred_score_100.columns)
          # cap at 1000 (limit for number of rows inserted by table-value constructor)
          tsql_chunksize = 1000 if tsql_chunksize > 1000 else tsql_chunksize
          print(tsql_chunksize)
          
          
          df.to_sql('table_name', con = engine, if_exists = 'append', index= False, chunksize=tsql_chunksize)
          

          PS:您可以根据需要更改参数。

          【讨论】:

            【解决方案10】:

            Pandas 0.25.1 有一个参数可以进行多次插入,因此不再需要使用 SQLAlchemy 解决这个问题。

            调用pandas.DataFrame.to_sql时设置method='multi'

            在这个例子中,它是 df.to_sql(table, schema=schema, con=e, index=False, if_exists='replace', method='multi')

            答案来自文档here

            值得注意的是,我只使用 Redshift 对此进行了测试。请让我知道它在其他数据库上的运行情况,以便我可以更新此答案。

            【讨论】:

            • 当我在 MSSQL+Pyodbc 驱动程序中使用“multi”时,SQLAlchemy 会抛出一个通用的 DBAPIError,不幸的是,这使得调试变得非常困难。
            • 我意识到 MSSQL 最多只支持 2100 个参数,因此大型多插入不起作用。我通过使用 chunksize 参数将数据帧值分解为更小的参数列表来解决了这个问题。
            • method="multi" 并没有提高我批量加载到 SQL Server 的速度,只有 create_engine(..., fast_executemany=True) 有。
            猜你喜欢
            • 2017-01-01
            • 2011-04-09
            • 2018-09-05
            • 2020-10-21
            • 2018-07-22
            • 1970-01-01
            • 2018-09-18
            • 1970-01-01
            • 2016-10-04
            相关资源
            最近更新 更多