【发布时间】:2015-07-31 23:25:17
【问题描述】:
我想使用 pandas df.to_sql() 函数附加到现有表。
我设置了if_exists='append',但我的表有主键。
在尝试对现有表使用append 时,我想做与insert ignore 等效的操作,这样我就可以避免重复输入错误。
熊猫可以做到这一点,还是我需要写一个明确的查询?
【问题讨论】:
我想使用 pandas df.to_sql() 函数附加到现有表。
我设置了if_exists='append',但我的表有主键。
在尝试对现有表使用append 时,我想做与insert ignore 等效的操作,这样我就可以避免重复输入错误。
熊猫可以做到这一点,还是我需要写一个明确的查询?
【问题讨论】:
很遗憾,没有指定“INSERT IGNORE”的选项。这就是我绕过该限制的方法,将不重复的行插入到该数据库中(数据框名称为 df)
for i in range(len(df)):
try:
df.iloc[i:i+1].to_sql(name="Table_Name",if_exists='append',con = Engine)
except IntegrityError:
pass #or any other action
【讨论】:
if_exists='append'作为参数
from sqlalchemy import exc 并将异常更改为:except exc.IntegrityError as e:。就像@miro 说的那样,它确实减慢了这个过程。
created_at 和 updated_at 等列是自动填充的。这种方法行不通!
请注意"if_exists='append'" 与表的存在有关,如果表 不存在该怎么办。
if_exists 与表的内容无关。
在此处查看文档:https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_sql.html
if_exists : {‘fail’, ‘replace’, ‘append’},默认‘fail’ 失败:如果表存在,什么也不做。 replace:如果表存在,则删除它,重新创建它,然后插入数据。 append:如果表存在,则插入数据。如果不存在则创建。
【讨论】:
Pandas 目前没有选择,但这里是the Github issue。如果您也需要此功能,请为它投票。
【讨论】:
您可以使用to_sql 的method 参数来做到这一点:
from sqlalchemy.dialects.mysql import insert
def insert_on_duplicate(table, conn, keys, data_iter):
insert_stmt = insert(table.table).values(list(data_iter))
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(insert_stmt.inserted)
conn.execute(on_duplicate_key_stmt)
df.to_sql('trades', dbConnection, if_exists='append', chunksize=4096, method=insert_on_duplicate)
对于旧版本的 sqlalchemy,您需要将 dict 传递给 on_duplicate_key_update。即on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(dict(insert_stmt.inserted))
【讨论】:
on_duplicate_key_update 不接受 ColumnCollection,我必须创建一个 dict。
Pandas 不支持编辑 .to_sql 方法的实际 SQL 语法,因此您可能不走运。有一些实验性的编程解决方法(例如,使用CALCHIPAN 将 Dataframe 读取到 SQLAlchemy 对象并使用 SQLAlchemy 进行事务),但最好将 DataFrame 写入 CSV 并使用显式 MySQL 函数加载它。
CALCHIPAN 回购:https://bitbucket.org/zzzeek/calchipan/
【讨论】:
在我的例子中,我试图在一个空表中插入新数据,但是有些行是重复的,这里几乎是同样的问题,我“可能”考虑获取现有数据并与我得到的新数据合并并继续进行,但这不是最优的,可能只适用于小数据,而不是大表。
由于 pandas 目前没有为这种情况提供任何处理,我一直在寻找合适的解决方法,所以我自己做了,不确定这是否适合你,但我决定控制我的数据首先是我的数据,而不是等待它是否有效,所以我所做的是在调用.to_sql 之前删除重复项,所以如果发生任何错误,我会更多地了解我的数据并确保我知道发生了什么:
import pandas as pd
def write_to_table(table_name, data):
df = pd.DataFrame(data)
# Sort by price, so we remove the duplicates after keeping the lowest only
data.sort(key=lambda row: row['price'])
df.drop_duplicates(subset=['id_key'], keep='first', inplace=True)
#
df.to_sql(table_name, engine, index=False, if_exists='append', schema='public')
所以在我的情况下,我想保持行的最低价格(顺便说一句,我为 data 传递了一个 dict 数组),为此,我先进行了排序,没有必要,但这是一个示例我的意思是控制我想要保留的数据。
我希望这会对与我的情况几乎相同的人有所帮助。
【讨论】:
上面的 for 循环方法显着减慢了速度。您可以将一个方法参数传递给 panda.to_sql 以帮助实现 sql 查询的自定义
https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html#pandas.DataFrame.to_sql
下面的代码应该适用于 postgres,如果与主键“unique_code”发生冲突,则什么也不做。更改数据库的插入方言。
def insert_do_nothing_on_conflicts(sqltable, conn, keys, data_iter):
"""
Execute SQL statement inserting data
Parameters
----------
sqltable : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import table, column
columns=[]
for c in keys:
columns.append(column(c))
if sqltable.schema:
table_name = '{}.{}'.format(sqltable.schema, sqltable.name)
else:
table_name = sqltable.name
mytable = table(table_name, *columns)
insert_stmt = insert(mytable).values(list(data_iter))
do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=['unique_code'])
conn.execute(do_nothing_stmt)
pd.to_sql('mytable', con=sql_engine, method=insert_do_nothing_on_conflicts)
【讨论】: