【问题标题】:Speeding (Bulk) Insert into MySQL with Python使用 Python 加速(批量)插入 MySQL
【发布时间】:2018-03-14 13:58:51
【问题描述】:

我正在部署一个应用程序来使用一些 .csv 数据。我想将它们复制到 MySQL 表中。在 stackoverflow 用户的帮助下,我编写了以下代码:

import csv
import MySQLdb

db = MySQLdb.connect(   host = "dbname.description.host.com",
                        user = "user",
                        passwd = "key",
                        db = "dbname")
cursor = db.cursor()

query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'                                                         

csv_data = csv.reader(file('file_name'))

for row in csv_data:
     cursor.execute(query,row)
     db.commit()

cursor.close()

问题是,目前这个过程太慢了,我需要加快速度。

谢谢

【问题讨论】:

  • 研究使用LOAD DATA,它针对在 MySQL 中批量加载数据进行了优化。
  • 感谢 cmets。我现在正在检查。我已经尝试过 pyodbc/pypyodbc 并且两者都有问题。
  • 尝试加载数据,但没有结果。除了代码控制台说一切正常没有数据上传到mysql,当我使用“SELECT * FROM table_name”时没有添加新行
  • 知道了!谢谢,伙计!

标签: python mysql csv


【解决方案1】:

您可以使用executemany 来批处理作业,如下所示

import csv
import MySQLdb

db = MySQLdb.connect(   host = "dbname.description.host.com",
                        user = "user",
                        passwd = "key",
                        db = "dbname")
cursor = db.cursor()

query = 'INSERT INTO table_name(column,column_1,column_2,column_3)
VALUES(%s, %s, %s, %s)'                                                         

csv_data = csv.reader(file('file_name'))

my_data = []
for row in csv_data:
     my_data.append(tuple(row))

cursor.executemany(query, my_data)
cursor.close()

【讨论】:

  • executemany 是批处理操作还是一个一个遍历整个列表并插入它们?基本上是 executemany = 循环的 execute() 吗?
  • @shreeshkatti 大多数时候,它是一个循环,except for inserts 针对它进行了优化。
【解决方案2】:

由于多种原因,您使用的代码效率极低,因为您一次提交一行数据(这将是您想要的事务数据库或进程),但不是一次性的转储。

有很多方法可以加快速度,从好到不太好。这里有 4 种方法,包括朴素的实现(上图)

#!/usr/bin/env python
import pandas as pd
import numpy as np
import odo
import profilehooks
import sqlalchemy
import csv
import os


def create_test_data():
    n = 100000
    df = pd.DataFrame(dict(
        id=np.random.randint(0, 1000000, n),
        col1=np.random.choice(['hello', 'world', 'python', 'large string for testing ' * 10], n),
        col2=np.random.randint(-1000000, 1000000, n),
        col3=np.random.randint(-9000000, 9000000, n),
        col4=(np.random.random(n) - 0.5) * 99999
    ), columns=['id', 'col1', 'col2', 'col3', 'col4'])
    df.to_csv('tmp.csv', index=False)


@profilehooks.timecall
def using_pandas(table_name, uri):
    df = pd.read_csv('tmp.csv')
    df.to_sql(table_name, con=uri, if_exists='append', index=False)


@profilehooks.timecall
def using_odo(table_name, uri):
    odo.odo('tmp.csv', '%s::%s' % (uri, table_name))


@profilehooks.timecall
def using_cursor(table_name, uri):
    engine = sqlalchemy.create_engine(uri)
    query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
    query = query.format(table_name)
    con = engine.raw_connection()
    with con.cursor() as cursor:
        with open('tmp.csv') as fh:
            reader = csv.reader(fh)
            next(reader)  # Skip firt line (headers)
            for row in reader:
                cursor.execute(query, row)
                con.commit()
    con.close()


@profilehooks.timecall
def using_cursor_correct(table_name, uri):
    engine = sqlalchemy.create_engine(uri)
    query = 'INSERT INTO {} (id, col1, col2, col3, col4) VALUES(%s, %s, %s, %s, %s)'
    query = query.format(table_name)
    with open('tmp.csv') as fh:
        reader = csv.reader(fh)
        next(reader)  # Skip firt line (headers)
        data = list(reader)
    engine.execute(query, data)


def main():
    uri = 'mysql+pymysql://root:%s@localhost/test' % os.environ['pass']

    engine = sqlalchemy.create_engine(uri)
    for i in (1, 2, 3, 4):
        engine.execute("DROP TABLE IF EXISTS table%s" % i)
        engine.execute("""
            CREATE TABLE table%s(
                id INT,
                col1 VARCHAR(255),
                col2 INT,
                col3 INT,
                col4 DOUBLE
            );
        """ % i)
    create_test_data()

    using_odo('table1', uri)
    using_pandas('table4', uri)
    using_cursor_correct('table3', uri)
    using_cursor('table2', uri)

    for i in (1, 2, 3, 4):
        count = pd.read_sql('SELECT COUNT(*) as c FROM table%s' % i, con=uri)['c'][0]
        print("Count for table%s - %s" % (i, count))


if __name__ == '__main__':
    main()

odo 方法是最快的(在后台使用 MySQL LOAD DATA INFILE) 接下来是 Pandas(优化了关键代码路径) 接下来是使用原始游标但批量插入行 最后是幼稚的方法,一次提交一行

以下是针对本地 MySQL 服务器在本地运行的一些示例计时。

using_odo (./test.py:29): 0.516 秒

using_pandas (./test.py:23): 3.039 秒

using_cursor_correct (./test.py:50): 12.847 秒

using_cursor (./test.py:34): 43.470 秒

table1 的计数 - 100000

table2 的计数 - 100000

table3 的计数 - 100000

table4 的计数 - 100000

如您所见,朴素的实现比 odo 慢约 100 倍。 而且比使用 pandas 慢 10 倍

【讨论】:

  • 您能否指定您使用的 odo 和 python 版本,因为我在导入 odo 时遇到错误: Traceback(最近一次调用最后一次):文件“main.py”,第 1 行,在 from odo import odo File "/home/env/lib/python3.7/site-packages/odo/__init__.py",第 39 行,在 from .backends import sql File "/home/env/ lib/python3.7/site-packages/odo/backends/sql.py",第 396 行,在 @discover.register(sa.engine.RowProxy) AttributeError:模块 'sqlalchemy.engine' 没有属性 'RowProxy '
【解决方案3】:

解决方案是使用 MySQL 中的batch insert

因此,您需要获取所有要插入的值并将它们转换为单个字符串,用作 execute() 方法的参数。

最后你的 SQL 应该是这样的:

INSERT INTO table_name (`column`, `column_1`, `column_2`, `column_3`) VALUES('1','2','3','4'),('4','5','6','7'),('7','8','9','10');

这是一个例子:

#function to transform your list into a string
def stringify(v): 
    return "('%s', '%s', %s, %s)" % (v[0], v[1], v[2], v[3])

#transform all to string
v = map(stringify, row)

#glue them together
batchData = ", ".join(e for e in v)

#complete the SQL
sql = "INSERT INTO `table_name`(`column`, `column_1`, `column_2`, `column_3`) \
VALUES %s" % batchData

#execute it
cursor.execute(sql)
db.commit()

【讨论】:

    【解决方案4】:

    这里有一些统计数据可以支持@Mike Tung 的回答。 executemany 执行 executeexecute 很难在 1 秒内达到 315 次插入,而 executemany 我达到了 25,000 次插入。

    基础机器配置-

    2.7 GHz Dual-Core Intel Core i5
    16 GB 1867 MHz DDR3
    Flash Storage
    

    结果:

    cursor.execute: 250 Inserts to max 315 Inserts in one second
    cursor.executemany: 25,000 Inserts in one second
    

    【讨论】:

    • 感谢统计,很有帮助。
    【解决方案5】:

    取出提交:

    for row in csv_data:
         cursor.execute(query,row)
    db.commit()
    

    它会做更少的工作并且会更快

    【讨论】:

    • 不够快。我必须插入最多 14 列和 2000 万行的表
    【解决方案6】:

    我通过使用元组数组解决了这个问题,并将其放入执行语句中。处理 1 英里时。行只用了 8 分钟。尽量避免迭代 con.execute 命令

    def process_csv_file4(csv_file, conn):
    df = pd.read_csv(csv_file,sep=';',
                     names=['column'])
    
    query = """
            INSERT INTO table
                (column)
            VALUES 
                (%s) 
            ON DUPLICATE KEY UPDATE 
                column= VALUES(column);
            """    
    
    conn.execute(query, tuple(df.values))
    

    【讨论】:

      【解决方案7】:

      我正在使用 SQL alchemy 库通过 python 脚本加速从 CSV 文件到 MySql 数据库的批量插入。数据库中的数据将以文本格式插入,因此连接到数据库工作台并更改数据类型,数据即可使用。

      第 1 步: 在命令终端中使用“pip install sqlalchemy”和“pip install mysqlclient”。

      import MySQLdb
      
      import sqlalchemy
      
      from sqlalchemy import  create_engine
      
      Step 2:
      Then create a connection string of create engine through SQL alchemy.
      
      ######Create Engine####
      
      syntax- enginecreate_engine("mysql+mysqldb://username:password@hostadress:3306/username")
      
      egzample-
      
      enginecreate_engine("mysql+mysqldb://abc9:abc$123456@127.10.23.1:2207/abc9")
      conn=engine.connect()
      
      print(engine);
      
      ###########Define your python code##############
      
      def function_name():
      
          data = pd.read_csv(filepath/file.csv')   
          data_frame = data.to_sql('database_name', engine, method='multi',index=False, 
          if_exists='replace')
      
      ############Close Connection###############
      
      conn = engine.raw_connection()
      
      conn.commit()
      
      conn.close()
      

      运行代码,4分钟内可以插入200万行!!

      将此参考链接用于不同的数据库驱动程序:

      https://overiq.com/sqlalchemy-101/installing-sqlalchemy-and-connecting-to-database/

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2012-04-14
        • 2011-05-16
        • 1970-01-01
        • 2016-07-11
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多