【问题标题】:SQLAlchemy, Psycopg2 and Postgresql COPYSQLAlchemy、Psycopg2 和 Postgresql 复制
【发布时间】:2012-10-18 23:31:51
【问题描述】:

看起来 Psycopg 有一个用于执行 COPY 的自定义命令:

psycopg2 COPY using cursor.copy_from() freezes with large inputs

有没有办法通过 SQLAlchemy 访问此功能?

【问题讨论】:

    标签: postgresql sqlalchemy psycopg2


    【解决方案1】:

    接受的答案是正确的,但如果您想要的不仅仅是 EoghanM 的评论,那么在将表格复制到 CSV 时,以下内容对我有用...

    from sqlalchemy import sessionmaker, create_engine
    
    eng = create_engine("postgresql://user:pwd@host:5432/db")
    ses = sessionmaker(bind=engine)
    
    dbcopy_f = open('/tmp/some_table_copy.csv','wb')
    
    copy_sql = 'COPY some_table TO STDOUT WITH CSV HEADER'
    
    fake_conn = eng.raw_connection()
    fake_cur = fake_conn.cursor()
    fake_cur.copy_expert(copy_sql, dbcopy_f)
    

    sessionmaker 不是必需的,但如果您习惯同时创建引擎和会话以使用 raw_connection,则需要将它们分开(除非有某种方法可以访问引擎通过我不知道的会话对象)。提供给copy_expert 的sql 字符串也不是唯一的方法,有一个基本的copy_to 函数,您可以将其与可以传递给普通COPY TO 查询的参数子集一起使用。该命令的整体性能对我来说似乎很快,复制出约 20000 行的表。

    http://initd.org/psycopg/docs/cursor.html#cursor.copy_to http://docs.sqlalchemy.org/en/latest/core/connections.html#sqlalchemy.engine.Engine.raw_connection

    【讨论】:

    • 这是一个很棒的发现。这将我保存数据的时间从 8 小时以上的一夜之间减少到 4 分钟左右。天哪!
    • 为我工作,但我必须在最后做fake_conn.commit()
    【解决方案2】:

    如果您的引擎配置有 psycopg2 连接字符串(这是默认值,因此 "postgresql://...""postgresql+psycopg2://..."),您可以使用 SQL Alchemy 会话创建 psycopg2 游标

    cursor = session.connection().connection.cursor()
    

    你可以用来执行

    cursor.copy_from(...)
    

    光标将在与您当前会话相同的事务中处于活动状态。如果发生commitrollback,任何进一步使用光标并抛出psycopg2.InterfaceError,您都必须创建一个新的。

    【讨论】:

    • 赞成展示如何使用传统会话实际获取光标。
    • 有没有办法用 asyncpg 方言做到这一点?驱动程序本身有一个connection.copy_from_table 方法,但在光标上不可用。如果我在其他答案中获得原始连接,它将不在同一个事务中。
    【解决方案3】:

    你可以使用:

    def to_sql(engine, df, table, if_exists='fail', sep='\t', encoding='utf8'):
        # Create Table
        df[:0].to_sql(table, engine, if_exists=if_exists)
    
        # Prepare data
        output = cStringIO.StringIO()
        df.to_csv(output, sep=sep, header=False, encoding=encoding)
        output.seek(0)
    
        # Insert data
        connection = engine.raw_connection()
        cursor = connection.cursor()
        cursor.copy_from(output, table, sep=sep, null='')
        connection.commit()
        cursor.close()
    

    我在 5 秒而不是 4 分钟内插入 200000 行

    【讨论】:

    • 您能否详细介绍一下df 对象是什么?
    • df 是一个熊猫数据框
    • 这是金子! (使用 pandas 和 sqlalchemy 和 postgres)
    • 我知道已经有一段时间了,但你能解释一下为什么你使用 pandas to_sql 和 sqlalchemy copy_from 吗? to_sql 不会创建表并将 df 的内容写入表吗?如果是这样,为什么还要再次插入数据?我正在使用它来批量插入数据,但目标表有一个序列 ID 作为它的索引,无论我尝试哪种方式,我似乎都无法在没有索引的情况下插入数据(没有出现错误说缺少列数据)
    • @JustinMoser 碰巧pandas.DataFrame 有一个to_sql 方法,但这里没有使用该方法。相反,to_csv 方法用于将df 存储在in-memory text streamoutput 中,然后使用raw_connection 将其传递给psycopg2
    【解决方案4】:

    It doesn't look like it.

    您可能只需要使用 psycopg2 来公开此功能并放弃 ORM 功能。我想我并没有真正看到 ORM 在这样的操作中的好处,因为它是直接的批量插入,并且处理单个对象(例如 ORM)并没有真正的意义。

    【讨论】:

    • super - 可以通过 engine.raw_connection() 进入 psycopg
    【解决方案5】:

    如果您从 SQLAlchemy 开始,您需要首先访问连接引擎(在某些 SQLAlchemy 对象上也称为属性名称 bind):

    engine = create_engine('postgresql+psycopg2://myuser:password@localhost/mydb')
    # or 
    engine = session.engine
    # or any other way you know to get to the engine
    

    您可以从引擎中隔离 psycopg2 连接:

    # get a psycopg2 connection
    connection = engine.connect().connection
    
    # get a cursor on that connection
    cursor = connection.cursor()
    

    这里有一些模板供 COPY 语句与 cursor.copy_expert() 一起使用,这是一个比 copy_from()copy_to() 更完整和灵活的选项,如下所示:https://www.psycopg.org/docs/cursor.html#cursor.copy_expert

    # to dump to a file
    dump_to = """
    COPY mytable 
    TO STDOUT
    WITH (
        FORMAT CSV,
        DELIMITER ',',
        HEADER
    );
    """
    
    # to copy from a file:
    copy_from = """
    COPY mytable 
    FROM STDIN
    WITH (
        FORMAT CSV,
        DELIMITER ',',
        HEADER
    );
    """
    

    查看上述选项的含义以及您的具体情况可能感兴趣的其他选项https://www.postgresql.org/docs/current/static/sql-copy.html

    重要提示:cursor.copy_expert() 文档的链接表示使用 STDOUT 写入文件并使用 STDIN 从文件复制。但是,如果您查看 PostgreSQL 手册中的语法,您会注意到您还可以直接在 COPY 语句中指定要写入或写入的文件。不要那样做,如果您不是以 root 身份运行(谁在开发过程中以 root 身份运行 Python?),您可能只是在浪费时间987654331@,应该没问题。

    # running the copy statement
    with open('/path/to/your/data/file.csv') as f:
         cursor.copy_expert(copy_from, file=f)
    
    # don't forget to commit the changes.
    connection.commit()
    

    【讨论】:

      【解决方案6】:

      您不需要下拉到 psycopg2,使用 raw_connection 或游标。

      像往常一样执行sql,你甚至可以用text()绑定参数:

      engine.execute(text('''copy some_table from :csv
                             delimiter ',' csv'''
                         ).execution_options(autocommit=True),
                     csv='/tmp/a.csv')
      

      如果this PR 将被接受,您可以放弃execution_options(autocommit=True)

      【讨论】:

      • 只是透露它是您的代码存储库,它似乎是
      • 不,链接为“this PR”的不是我的存储库,它是 sqlalchemy 的普通/官方存储库。 OTOH PR 显然是我的,这不是秘密:我在 stackoverflow 和 bitbucket 上使用相同的用户名。无论如何,我在研究该更改的示例时偶然发现了这个问题,而我所写的所有内容实际上都是正确的。我本可以避免链接 PR,但是(如果它被接受)那么这个答案将暗示一个过时的 sn-p,除非我或有人会记得在事后更新它
      • 在旧答案中添加注释,但 COPY 命名文件需要数据库超级用户权限,这可能不是很理想。
      猜你喜欢
      • 1970-01-01
      • 2011-07-21
      • 1970-01-01
      • 2014-07-27
      • 1970-01-01
      • 1970-01-01
      • 2012-01-25
      • 2015-04-19
      • 1970-01-01
      相关资源
      最近更新 更多