【问题标题】:Fast insertion of pandas DataFrame into Postgres DB using psycopg2使用 psycopg2 将 pandas DataFrame 快速插入 Postgres DB
【发布时间】:2026-02-11 06:50:01
【问题描述】:

我正在尝试以最有效的方式(使用 Python 2.7)将 pandas DataFrame 插入 Postgresql DB (9.1)。
使用“cursor.execute_many”真的很慢,“DataFrame.to_csv(buffer,...)”和“copy_from”也是如此。
我发现一个已经很多了!网络上更快的解决方案(http://eatthedots.blogspot.de/2008/08/faking-read-support-for-psycopgs.html),我适应了它与熊猫一起工作。
我的代码可以在下面找到。
我的问题是这个相关问题的方法(使用“从标准输入复制二进制”)是否可以很容易地转移到使用 DataFrames,如果这会更快。
Use binary COPY table FROM with psycopg2
不幸的是,我的 Python 技能不足以理解这种方法的实现。
这是我的方法:


import psycopg2
import connectDB # this is simply a module that returns a connection to the db
from datetime import datetime

class ReadFaker:
    """
    This could be extended to include the index column optionally. Right now the index
    is not inserted
    """
    def __init__(self, data):
        self.iter = data.itertuples()

    def readline(self, size=None):
        try:
            line = self.iter.next()[1:]  # element 0 is the index
            row = '\t'.join(x.encode('utf8') if isinstance(x, unicode) else str(x) for x in line) + '\n'
        # in my case all strings in line are unicode objects.
        except StopIteration:
            return ''
        else:
            return row

    read = readline

def insert(df, table, con=None, columns = None):

    time1 = datetime.now()
    close_con = False
    if not con:
        try:
            con = connectDB.getCon()   ###dbLoader returns a connection with my settings
            close_con = True
        except psycopg2.Error, e:
            print e.pgerror
            print e.pgcode
            return "failed"
    inserted_rows = df.shape[0]
    data = ReadFaker(df)

    try:
        curs = con.cursor()
        print 'inserting %s entries into %s ...' % (inserted_rows, table)
        if columns is not None:
            curs.copy_from(data, table, null='nan', columns=[col for col in columns])
        else:
            curs.copy_from(data, table, null='nan')
        con.commit()
        curs.close()
        if close_con:
            con.close()
    except psycopg2.Error, e:
        print e.pgerror
        print e.pgcode
        con.rollback()
        if close_con:
            con.close()
        return "failed"

    time2 = datetime.now()
    print time2 - time1
    return inserted_rows

【问题讨论】:

  • 任何更新哪个答案产生了最佳性能?
  • 没有一个答案真正回答了我的问题。
  • 检查这个问题:*.com/questions/41875817/…。有相当完整的答案。

标签: python postgresql psycopg2 pandas


【解决方案1】:

Pandas 数据帧现在有一个 .to_sql 方法。尚不支持 Postgresql,但有一个看起来可以工作的补丁。请参阅问题herehere

【讨论】:

  • 这无济于事,因为它只是使用了一种速度明显较慢的标准方法。
  • .to_sql 也很慢。我正在使用 python 2.7
【解决方案2】:

我没有测试过性能,但也许你可以使用这样的东西:

  1. 遍历 DataFrame 的行,生成代表行的字符串(见下文)
  2. 在流​​中转换这个可迭代对象,例如使用Python: Convert an iterable to a stream?
  3. 最后在这个流上使用 psycopg 的copy_from

要有效地生成 DataFrame 的行,请使用以下内容:

    def r(df):
            for idx, row in df.iterrows():
                    yield ','.join(map(str, row))

【讨论】:

  • 这也流式传输 CSV,这是我上面的班级正在做的事情。我的问题是在创建流时二进制表示是否会产生更快的结果。