【问题标题】:Delete rows from SQL server bases on content in dataframe根据数据框中的内容从 SQL Server 中删除行
【发布时间】:2020-01-05 18:52:53
【问题描述】:

我在名为 dbo.inventory 的 SQL Server 中有一个库存表,其中包含 YearMonthMaterialStock_quantity。我每天都会收到一个新的库存计数作为 csv 文件,需要将其加载到dbo.inventory 表中。但是,如果 csv 文件中的 YearMonth 已经存在于数据库中,我确实需要删除数据库中的记录,以避免加载同一月的多个库存计数。

在 SQL 中我会这样做:

Delete t1 
FROM dbo.inventory t1
JOIN csv t2 ON t1.Year = t2.Year and t1.Month = t2.Month

我不知道如何在 Python 脚本中执行此操作,因此我避免将我的 CSV 文件作为临时表加载到数据仓库中,而只是删除与 YearMonth 匹配的现有行,然后加载它们。

我在另一个设置中使用了以下内容:

delete_date = sales.Date.max()
connection = engine.connect()
connection.execute(f"""delete from sales where Date = '{delete_date}'""")
connection.close()

但这在这里不起作用,因为应该删除的输入是一个数据框,如果它是对先前加载的数据的更正,理论上它可能包含多年和数月。

【问题讨论】:

    标签: python sql sql-server pandas sqlalchemy


    【解决方案1】:

    Pandas 不支持根据特定条件删除 SQL 行。您必须告诉 SQL Server 您要删除哪些行:

    import sqlalchemy as sa
    
    engine = sa.create_engine('mssql+pyodbc://...')
    meta = sa.MetaData()
    
    # Map the Inventory table in your database to a SQLAlchemy object
    inventory = sa.Table('Inventory', meta, autoload=True, autoload_with=engine)
    
    # Build the WHERE clause of your DELETE statement from rows in the dataframe.
    # Equivalence in T-SQL
    #      WHERE (Year = ... AND Month = ...) OR (Year = ... AND Month = ...) OR (Year = ... AND Month = ...)
    cond = df.apply(lambda row: sa.and_(inventory.c['Year'] == row['Year'], inventory.c['Month'] == row['Month']), axis=1)
    cond = sa.or_(*cond)
    
    # Define and execute the DELETE
    delete = inventory.delete().where(cond)
    with engine.connect() as conn:
        conn.execute(delete)
    
    # Now you can insert the new data
    df.to_sql('Inventory', engine, if_exists='append', index=False)
    

    【讨论】:

    • 如果我只有一个条件,我该怎么做?你还会使用'sa.and'吗?你还会使用'cond = sa.or_(*cond)'吗?
    • 你的条件是什么,在 SQL 中?
    • 其中 df.column.value = Inventory.column.value。这是一个比提出的问题更简单的问题,但我找不到正确的文档。
    • 基本上,如果我的数据框中的列值已经在 sql 中,我需要用我的数据框中的行替换该行。
    • 试试cond = inventory.c['col_name'].in_(df['col_name'])
    【解决方案2】:

    我最终这样做了:

    Inventory['Posting_date'] = pd.to_datetime(Inventory.Year.astype('str')+Inventory.Posting_period.astype('str'), format="%Y%m") + MonthEnd(1)
    
    #Delete periods already existing in the database in order to avoid duplicates when reloading etc.
    delete_date_inv = Inventory.Posting_date.drop_duplicates()
    delete_date_inv = delete_date_inv.astype('str')
    delete_date_inv = delete_date_inv.to_list()
    
    #Apply the deletion of the destination
    connection = engine.connect()
    connection.execute(f"""delete from Inventory where Posting_date in ({str(delete_date_inv)[1:-1]})""")
    connection.close()
    
    #Load to the database
    Inventory.to_sql('Inventory', schema = 'dbo', con=engine, if_exists='append', index = False, chunksize = 10000)
    

    我不确定上述是否比其他答案更有效,但它有效:-)

    谢谢大家的cmets。

    【讨论】:

      【解决方案3】:

      我认为你有两个不错的选择。

      1) 在 Pandas 中工作。使用 Pandas.read_sql_table() 查询现有表,将 csv 文件作为第二个 DataFrame 导入,并将旧表与新表合并更新。然后插入更新的 DataFrame,例如使用 df.to_sql(..., if exists='update')。

      2) 使用 sqlalchemy 并在数据库中工作,特别是如果您想保留架构或其他条件。

      下面是基于这两种解决方案的简短通用示例。其他更具体的解决方案可能是可能的,但这是两个起点。

      import sqlalchemy as sa
      import sqlalchemy.ext.declarative as sa_dec
      import sqlalchemy.orm as sa_orm
      import pandas as pd
      from sqlalchemy import update
      from sqlalchemy import and_
      
      #con = sqlite3.connect('hyp.db')
      #cur = con.cursor()
      
      # general pandas solution
      t1 = pd.DataFrame({'year': [1, 2, 3], 'month': [4, 5, 6], 'value': [2, 2, 2]})
      t2 = pd.DataFrame({'year': [1, 5, 3], 'month': [4, 9, 9], 'value': [1, 5, 10]})
      c = pd.merge(t1, t2, how='outer', on=['year', 'month'], suffixes=['', '_t2'])
      c.loc[c['value_t2'].notnull(), 'value'] = c.loc[c['value_t2'].notnull(), 'value_t2']
      c = c.drop('value_t2', axis=1)
      print(c)
      
      # pandas using update
      t1 = pd.DataFrame({'year': [1, 2, 3], 'month': [4, 5, 6], 'value': [2, 2, 2]})
      t2 = pd.DataFrame({'year': [1, 5, 3], 'month': [4, 9, 9], 'value': [1, 5, 10]})
      c = pd.merge(t1, t2, how='outer', on=['year', 'month'], suffixes=['', '_t2'])
      c['value'].update(c['value_t2'])
      c = c.drop('value_t2', axis=1)
      print(c)
      
      # the c.to_sql(...)
      
      ##### sqlalchemy
      
      Name = 'try.db'
      Type = 'sqlite'
      Url = sa.engine.url.URL(Type, database=Name)
      Engine = sa.engine.create_engine(Url)
      Base = sa_dec.declarative_base()
      Session = sa_orm.sessionmaker(bind=Engine)
      
      class Info(Base):
          __tablename__ = 'Inventory'
          id = sa.Column(sa.Integer, primary_key=True)
          __table_args__ = (sa.UniqueConstraint('Year', 'Month'),)
          Year = sa.Column(sa.String(250))
          Month = sa.Column(sa.String(250))
          Value = sa.Column(sa.Float)
      
      Base.metadata.create_all(Engine)
      
      # change values of year and month to test
      t = pd.DataFrame({'Year': [1, 2, 5], 'Month': ['Jun', 'July', 'Dec'], 'Value': [3, 3, 3]})
      
      
      # this isn't very efficient but it is here to give you a comprehensive example
      # where you have good control on what is happening
      for i, r in t.iterrows():
          newdata = Info()
          for col, val in r.items():
              setattr(newdata, col, val)
          con = Engine.connect()
          session = Session()  # open sqlalchemy-sqlite session
          session.add(newdata)  # add Info instance to session to insert
          try:
              session.flush()  # test insert, to see if there is any error
          except sa.exc.IntegrityError:  # here catch unique constraint error if already in db
              print('already in')
              session.rollback()  # rollback to remove the blocked instance
              stmt = update(Info).where(and_(Info.Year == r['Year'], Info.Year == r['Month'])).values(Value=r['Value'])
              con.execute(stmt)
          else:
              session.commit()  # commit changes to db
          finally:
              session.close()  # close session to keep clean, it will be open in case of new data
              con.close()
      

      我测试了这两种解决方案,它们似乎有效,但需要进一步测试。

      【讨论】:

        猜你喜欢
        • 2018-06-04
        • 2013-05-05
        • 1970-01-01
        • 1970-01-01
        • 2016-07-07
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-02-23
        相关资源
        最近更新 更多