【问题标题】:using sqlalchemy to load csv file into a database使用 sqlalchemy 将 csv 文件加载到数据库中
【发布时间】:2015-10-02 09:41:38
【问题描述】:

我想将 csv 文件加载到数据库中

【问题讨论】:

    标签: python database sqlalchemy


    【解决方案1】:

    由于 SQLAlchemy 的强大功能,我也在一个项目中使用它。它的力量来自于与数据库“对话”的面向对象的方式,而不是硬编码难以管理的 SQL 语句。更不用说,它也快了很多。

    直截了当地回答你的问题,是的!使用 SQLAlchemy 将数据从 CSV 存储到数据库中是小菜一碟。这是一个完整的工作示例(我使用了 SQLAlchemy 1.0.6 和 Python 2.7.6):

    from numpy import genfromtxt
    from time import time
    from datetime import datetime
    from sqlalchemy import Column, Integer, Float, Date
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    def Load_Data(file_name):
        data = genfromtxt(file_name, delimiter=',', skip_header=1, converters={0: lambda s: str(s)})
        return data.tolist()
    
    Base = declarative_base()
    
    class Price_History(Base):
        #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
        __tablename__ = 'Price_History'
        __table_args__ = {'sqlite_autoincrement': True}
        #tell SQLAlchemy the name of column and its attributes:
        id = Column(Integer, primary_key=True, nullable=False) 
        date = Column(Date)
        opn = Column(Float)
        hi = Column(Float)
        lo = Column(Float)
        close = Column(Float)
        vol = Column(Float)
    
    if __name__ == "__main__":
        t = time()
    
        #Create the database
        engine = create_engine('sqlite:///csv_test.db')
        Base.metadata.create_all(engine)
    
        #Create the session
        session = sessionmaker()
        session.configure(bind=engine)
        s = session()
    
        try:
            file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
            data = Load_Data(file_name) 
    
            for i in data:
                record = Price_History(**{
                    'date' : datetime.strptime(i[0], '%d-%b-%y').date(),
                    'opn' : i[1],
                    'hi' : i[2],
                    'lo' : i[3],
                    'close' : i[4],
                    'vol' : i[5]
                })
                s.add(record) #Add all the records
    
            s.commit() #Attempt to commit all the records
        except:
            s.rollback() #Rollback the changes on error
        finally:
            s.close() #Close the connection
        print "Time elapsed: " + str(time() - t) + " s." #0.091s
    

    (注意:这不一定是执行此操作的“最佳”方式,但我认为这种格式对于初学者来说非常易读;它也非常快:插入 251 条记录只需 0.091 秒!)

    我认为,如果您逐行浏览它,您会发现使用起来是多么轻而易举。注意缺少 SQL 语句——万岁!我还冒昧地使用 numpy 在两行中加载 CSV 内容,但如果您愿意,也可以不使用它。

    如果您想与传统的做法进行比较,这里有一个完整的示例供参考:

    import sqlite3
    import time
    from numpy import genfromtxt
    
    def dict_factory(cursor, row):
        d = {}
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
        return d
    
    
    def Create_DB(db):      
        #Create DB and format it as needed
        with sqlite3.connect(db) as conn:
            conn.row_factory = dict_factory
            conn.text_factory = str
    
            cursor = conn.cursor()
    
            cursor.execute("CREATE TABLE [Price_History] ([id] INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL UNIQUE, [date] DATE, [opn] FLOAT, [hi] FLOAT, [lo] FLOAT, [close] FLOAT, [vol] INTEGER);")
    
    
    def Add_Record(db, data):
        #Insert record into table
        with sqlite3.connect(db) as conn:
            conn.row_factory = dict_factory
            conn.text_factory = str
    
            cursor = conn.cursor()
    
            cursor.execute("INSERT INTO Price_History({cols}) VALUES({vals});".format(cols = str(data.keys()).strip('[]'), 
                        vals=str([data[i] for i in data]).strip('[]')
                        ))
    
    
    def Load_Data(file_name):
        data = genfromtxt(file_name, delimiter=',', skiprows=1, converters={0: lambda s: str(s)})
        return data.tolist()
    
    
    if __name__ == "__main__":
        t = time.time() 
    
        db = 'csv_test_sql.db' #Database filename 
        file_name = "t.csv" #sample CSV file used:  http://www.google.com/finance/historical?q=NYSE%3AT&ei=W4ikVam8LYWjmAGjhoHACw&output=csv
    
        data = Load_Data(file_name) #Get data from CSV
    
        Create_DB(db) #Create DB
    
        #For every record, format and insert to table
        for i in data:
            record = {
                    'date' : i[0],
                    'opn' : i[1],
                    'hi' : i[2],
                    'lo' : i[3],
                    'close' : i[4],
                    'vol' : i[5]
                }
            Add_Record(db, record)
    
        print "Time elapsed: " + str(time.time() - t) + " s." #3.604s
    

    (注意:即使是“旧”方式,这绝不是最好的方式,但它非常易读,并且是 SQLAlchemy 方式与“旧”方式的“一对一”翻译方式。)

    注意SQL语句:一个是创建表,另一个是插入记录。另外,请注意,维护长 SQL 字符串与简单的类属性添加相比要麻烦一些。到目前为止喜欢 SQLAlchemy?

    当然,至于您的外键查询。 SQLAlchemy 也有能力做到这一点。下面是一个外键赋值的类属性示例(假设 ForeignKey 类也已从 sqlalchemy 模块导入):

    class Asset_Analysis(Base):
        #Tell SQLAlchemy what the table name is and if there's any table-specific arguments it should know about
        __tablename__ = 'Asset_Analysis'
        __table_args__ = {'sqlite_autoincrement': True}
        #tell SQLAlchemy the name of column and its attributes:
        id = Column(Integer, primary_key=True, nullable=False) 
        fid = Column(Integer, ForeignKey('Price_History.id'))
    

    将“fid”列作为 Price_History 的 id 列的外键。

    希望有帮助!

    【讨论】:

    • 我会用旧的方式处理 sql。
    • 这是有用的代码,但如果数据文件包含在示例中会很有帮助。那么它就真正独立了。
    • 我还没有检查为什么会这样,但是genfromtxt 返回错误:genfromtxt() got an unexpected keyword argument 'skiprows'。 Numpy 是 1.12.1-3 (Debian 9.0)。
    • Faheem,一个示例 CSV 文件 URL 包含在其中一个 cmets 中; try 语句的第一行。下载它,将 if 放在与此脚本相同的目录中,然后运行它。
    • 这很酷,但不是说数据会先读入Python,再插入DB吗?仅使用数据库的批量加载程序并完全跳过数据必须通过 Python 流动的部分不是更快吗?我相信 ARA1307 的答案解决了这个问题,但是是特定于数据库的。如果有一种内部使用特定于您的数据库品牌的批量加载命令的 SQLAlchemy 方法,那就太好了。
    【解决方案2】:

    如果您的 CSV 非常大,使用 INSERTS 是非常无效的。您应该使用批量加载机制,该机制因基础而异。例如。在 PostgreSQL 中你应该使用“COPY FROM”方法:

    with open(csv_file_path, 'r') as f:    
        conn = create_engine('postgresql+psycopg2://...').raw_connection()
        cursor = conn.cursor()
        cmd = 'COPY tbl_name(col1, col2, col3) FROM STDIN WITH (FORMAT CSV, HEADER FALSE)'
        cursor.copy_expert(cmd, f)
        conn.commit()
    

    【讨论】:

    • 对于任何严重的事情,您确实希望直接使用 psycopg 中的copy_fromcopy_expert。该解决方案可以一次插入数百万行。
    • 有什么方法可以在不导入一些庞大的库的情况下实现这一点?
    • @Sajuuk:这个解决方案实际上只能使用 psycopg2(包含在 sqlalchemy 中),从而大大减少了它的依赖关系(不需要依赖所有的 sqlalchemy,因为我们在这里并没有真正使用它)。这个想法是直接用psycopg的psycopg2.connect("...")替换第一条语句的sqlalchemy.create_engine("...").raw_connection()
    • 我使用 SQLAlchemy ORM(更多的是出于授权而非偏好),但这确实应该是公认的答案,至少对于 Postgres 而言。我有一个超过 1 百万行的 CSV 文件,使用具有 the best performance here 的 sqlalchemy-core insert 策略需要 9 分钟(并且需要 1.3 GB 内存)。使用 COPY ... FROM ... 策略和 StringIO 缓冲区需要 10 秒,并且只有 250 MB 内存。
    【解决方案3】:

    我遇到了完全相同的问题,但矛盾的是,我发现在 pandas 中使用两步过程更容易:

    import pandas as pd
    with open(csv_file_path, 'r') as file:
        data_df = pd.read_csv(file)
    data_df.to_sql('tbl_name', con=engine, index=True, index_label='id', if_exists='replace')
    

    请注意,我的方法类似于 this one,但不知何故,Google 将我发送到此线程,所以我想我会分享。

    【讨论】:

    • 如果您有一个非常大的文件需要上传到数据库中怎么办?
    • 对于中型文件,您可以使用create_engine(..., fast_executemany=True) 设置sqlalchemy 引擎,这也将加速pandas 的to_sql
    【解决方案4】:

    要使用 sqlalchemy 将相对较小的 CSV 文件导入数据库,可以使用engine.execute(my_table.insert(), list_of_row_dicts),如"Executing Multiple Statements" section of the sqlalchemy tutorial 中的详细描述。

    这有时被称为“executemany”调用风格,因为它会产生executemany DBAPI call。 DB 驱动程序可能会执行单个多值 INSERT .. VALUES (..), (..), (..) 语句,从而减少到 DB 的往返次数并加快执行速度:

    根据sqlalchemy's FAQ,这是在不使用特定于数据库的批量加载方法(例如 Postgres 中的 COPY FROM、MySQL 中的 LOAD DATA LOCAL INFILE 等)的情况下获得的最快速度。特别是它比使用普通 ORM 更快(如@Manuel J. Diaz 的回答)、bulk_save_objectsbulk_insert_mappings

    import csv
    from sqlalchemy import create_engine, Table, Column, Integer, MetaData
    
    engine = create_engine('sqlite:///sqlalchemy.db', echo=True)
    
    metadata = MetaData()
    # Define the table with sqlalchemy:
    my_table = Table('MyTable', metadata,
        Column('foo', Integer),
        Column('bar', Integer),
    )
    metadata.create_all(engine)
    insert_query = my_table.insert()
    
    # Or read the definition from the DB:
    # metadata.reflect(engine, only=['MyTable'])
    # my_table = Table('MyTable', metadata, autoload=True, autoload_with=engine)
    # insert_query = my_table.insert()
    
    # Or hardcode the SQL query:
    # insert_query = "INSERT INTO MyTable (foo, bar) VALUES (:foo, :bar)"
    
    with open('test.csv', 'r', encoding="utf-8") as csvfile:
        csv_reader = csv.reader(csvfile, delimiter=',')
        engine.execute(
            insert_query,
            [{"foo": row[0], "bar": row[1]} 
                for row in csv_reader]
        )
    

    【讨论】:

    • +1 表示“这是在没有特定数据库的情况下获得的最快速度......” - 尽管如果有人 确实 可以选择使用特定的批量加​​载(COPY FROM等)然后他们probably should
    【解决方案5】:

    带有逗号和标题名称的 CSV 文件到 PostrgeSQL

    1. 我正在使用 csv Python 阅读器。 CSV 数据除以逗号 (,)
    2. 然后将其转换为 Pandas DataFrame。列的名称与 csv 文件中的名称相同。
    3. 结束最后一个,DataFrame 到 sql,引擎作为与 DB 的连接。 if_exists='替换/追加'
    import csv
    import pandas as pd
    from sqlalchemy import create_engine
    
    # Create engine to connect with DB
    try:
        engine = create_engine(
            'postgresql://username:password@localhost:5432/name_of_base')
    except:
        print("Can't create 'engine")
    
    # Get data from CSV file to DataFrame(Pandas)
    with open('test.csv', newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        columns = [
            'moment',
            'isin',
            'name'
        ]
        df = pd.DataFrame(data=reader, columns=columns)
    
    # Standart method of Pandas to deliver data from DataFrame to PastgresQL
    try:
        with engine.begin() as connection:
            df.to_sql('name_of_table', con=connection, index_label='id', if_exists='replace')
            print('Done, ok!')
    except:
        print('Something went wrong!')
    

    【讨论】:

      猜你喜欢
      • 2014-10-07
      • 2016-04-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-02
      • 2015-03-21
      • 1970-01-01
      相关资源
      最近更新 更多