【问题标题】:INSERT or UPDATE bulk data from dataframe/CSV to PostgreSQL database将数据框/CSV 中的批量数据插入或更新到 PostgreSQL 数据库
【发布时间】:2020-07-18 17:06:57
【问题描述】:

要求:插入新数据并更新现有数据批量(行数> 1000)从dataframe/CSV(任何套件)并将其保存在 PostgreSQL 数据库中。

表:TEST_TABLE

CREATE TABLE TEST_TABLE (
itemid varchar(100)  NOT NULL PRIMARY KEY,
title varchar(255),
street varchar(10),
pincode VARCHAR(100));

INSERT: ['756252', 'tom title', 'APC Road', '598733' ], 
        ['75623', 'dick title', 'Bush Road', '598787' ], 
        ['756211', 'harry title', 'Obama Street', '598733' ]

数据框内容:

data = [['756252', 'tom new title', 'Unknown Road', 'pin changed' ], 
        ['75623', 'dick new title', 'Bush Road changed', '598787 also changed' ], 
        ['756211', 'harry title', 'Obama Street', '598733'],
        ['7562876', 'new1 data title', 'A Street', '598730'],
        ['7562345', 'new2 data title', 'B Street', '598731'],
        ['7562534', 'new3 data title', 'C Street', '598732'],
        ['7562089', 'new4 data title', 'D Street', '598733']] 

df = pd.DataFrame(data, columns = ['itemid', 'title', 'street', 'pincode']) 

我想 UPDATE 具有相同 itemid 的记录和 INSERT 新记录。数据会很大(从数据框创建的 CSV 文件超过 50MB)。

使用的编程语言:Python

数据库:PostgreSQL

【问题讨论】:

  • 创建一个临时表,使用 COPY 移动数据,执行 INSERT ... ON CONFLICT 从临时表到实际表。
  • 对如何处理批量数据有什么建议吗?如果在循环中一次完成每一行,这将是非常昂贵的操作

标签: python sql pandas postgresql sqlalchemy


【解决方案1】:

在这种特殊情况下,最好降到 DB-API 级别,因为您需要一些即使 SQLAlchemy Core 也不会直接公开的工具,例如copy_expert()。这可以使用raw_connection() 来完成。如果您的源数据是 CSV 文件,那么在这种情况下您根本不需要 pandas。首先创建一个临时临时表,将数据复制到临时表,然后插入到目标表中并进行冲突处理:

conn = engine.raw_connection()

try:
    with conn.cursor() as cur:
        cur.execute("""CREATE TEMPORARY TABLE TEST_STAGING ( LIKE TEST_TABLE )
                       ON COMMIT DROP""")

        with open("your_source.csv") as data:
            cur.copy_expert("""COPY TEST_STAGING ( itemid, title, street, pincode )
                               FROM STDIN WITH CSV""", data)

        cur.execute("""INSERT INTO TEST_TABLE ( itemid, title, street, pincode )
                       SELECT itemid, title, street, pincode
                       FROM TEST_STAGING
                       ON CONFLICT ( itemid )
                       DO UPDATE SET title = EXCLUDED.title
                                   , street = EXCLUDED.street
                                   , pincode = EXCLUDED.pincode""")

except:
    conn.rollback()
    raise

else:
    conn.commit()

finally:
    conn.close()

另一方面,如果您的源数据是DataFrame,您仍然可以通过passing a function as method= to to_sql() 使用COPY。该函数甚至可以隐藏所有上述逻辑:

import csv

from io import StringIO
from psycopg2 import sql

def psql_upsert_copy(table, conn, keys, data_iter):
    dbapi_conn = conn.connection

    buf = StringIO()
    writer = csv.writer(buf)
    writer.writerows(data_iter)
    buf.seek(0)

    if table.schema:
        table_name = sql.SQL("{}.{}").format(
            sql.Identifier(table.schema), sql.Identifier(table.name))
    else:
        table_name = sql.Identifier(table.name)

    tmp_table_name = sql.Identifier(table.name + "_staging")
    columns = sql.SQL(", ").join(map(sql.Identifier, keys))

    with dbapi_conn.cursor() as cur:
        # Create the staging table
        stmt = "CREATE TEMPORARY TABLE {} ( LIKE {} ) ON COMMIT DROP"
        stmt = sql.SQL(stmt).format(tmp_table_name, table_name)
        cur.execute(stmt)

        # Populate the staging table
        stmt = "COPY {} ( {} ) FROM STDIN WITH CSV"
        stmt = sql.SQL(stmt).format(tmp_table_name, columns)
        cur.copy_expert(stmt, buf)

        # Upsert from the staging table to the destination. First find
        # out what the primary key columns are.
        stmt = """
               SELECT kcu.column_name
               FROM information_schema.table_constraints tco
               JOIN information_schema.key_column_usage kcu 
               ON kcu.constraint_name = tco.constraint_name
               AND kcu.constraint_schema = tco.constraint_schema
               WHERE tco.constraint_type = 'PRIMARY KEY'
               AND tco.table_name = %s
               """
        args = (table.name,)

        if table.schema:
            stmt += "AND tco.table_schema = %s"
            args += (table.schema,)

        cur.execute(stmt, args)
        pk_columns = {row[0] for row in cur.fetchall()}
        # Separate "data" columns from (primary) key columns
        data_columns = [k for k in keys if k not in pk_columns]
        # Build conflict_target
        pk_columns = sql.SQL(", ").join(map(sql.Identifier, pk_columns))

        set_ = sql.SQL(", ").join([
            sql.SQL("{} = EXCLUDED.{}").format(k, k)
            for k in map(sql.Identifier, data_columns)])

        stmt = """
               INSERT INTO {} ( {} )
               SELECT {}
               FROM {}
               ON CONFLICT ( {} )
               DO UPDATE SET {}
               """

        stmt = sql.SQL(stmt).format(
            table_name, columns, columns, tmp_table_name, pk_columns, set_)
        cur.execute(stmt)

然后您将插入新的DataFrame 使用

df.to_sql("test_table", engine,
          method=psql_upsert_copy,
          index=False,
          if_exists="append")

在这台带有本地数据库的机器上使用这种方法更新插入约 1,000,000 行大约需要 16 秒。

【讨论】:

  • 什么 ON CONFLICT , DO UPDATE SET, EXCLUDED.做?那么这将使用 TEST_STAGING 表中的值更新 TEST_TABLE 上的任何重复的 id 行吗?谢谢,干得好。
  • 在评论区解释它们会很有挑战性,所以我会推荐你​​到postgresql.org/docs/current/sql-insert.html,但你已经掌握了要点。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-25
  • 1970-01-01
  • 2016-01-26
  • 1970-01-01
  • 2014-08-30
  • 1970-01-01
相关资源
最近更新 更多