【问题标题】:SQLAlchemy bulk update strategiesSQLAlchemy 批量更新策略
【发布时间】:2017-06-11 17:44:15
【问题描述】:

我目前正在使用 SQLAlchemy(在 GAE 上,连接到 Google 的云 MySQL)编写一个 Web 应用程序 (Flask),并且需要对表进行批量更新。简而言之,完成了许多计算,导致需要在 1000 个对象上更新单个值。目前我正在事务中完成所有操作,但最终,刷新/提交需要很长时间。

该表在id 上有一个索引,所有这些都在单个事务中执行。所以我相信我已经避免了常见的错误,但仍然很慢。

INFO     2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}

据我了解,实际上没有办法在 SQL 中进行批量更新,上面的语句最终是多个 UPDATE 语句被发送到服务器。

我尝试过使用Session.bulk_update_mappings(),但这似乎并没有真正做任何事情:(不知道为什么,但更新从未真正发生过。我看不到实际使用此方法的任何示例(包括性能套件)所以不确定它是否打算使用。

One technique I've seen discussed 正在对另一个表进行批量插入,然后进行 UPDATE JOIN。我已经对其进行了测试,如下所示,它似乎要快得多。

wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')

但现在的问题是如何构建我的代码。我正在使用 ORM,我需要以某种方式不“弄脏”原始的 Wallet 对象,以免它们以旧方式提交。我可以只创建这些Ledger 对象并保留它们的列表,然后在批量操作结束时手动插入它们。但这几乎闻起来像是在复制 ORM 机制的一些工作。

有没有更聪明的方法来做到这一点?到目前为止,我的大脑正在下降,例如:

class Wallet(Base):
    ...
    _balance = Column(Float)
    ...

@property
def balance(self):
    # first check if we have a ledger of the same id
    # and return the amount in that, otherwise...
    return self._balance

@balance.setter
def balance(self, amount):
    l = Ledger(id=self.id, amount=amount)
    # add l to a list somewhere then process later

# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE

正如我所说,这一切似乎都在与我(可能)拥有的工具作斗争。有没有更好的方法来处理这个?我可以利用 ORM 机制来执行此操作吗?还是有更好的方法来进行批量更新?

编辑:或者活动和会议可能有什么巧妙之处?也许 before_flush?

编辑 2:所以我尝试利用事件机制,现在有了这个:

@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
    ledgers = []

    if session.dirty:
        for elem in session.dirty:
            if ( session.is_modified(elem, include_collections=False) ):
                if isinstance(elem, Wallet):
                    session.expunge(elem)
                    ledgers.append(Ledger(id=elem.id, amount=elem.balance))

    if ledgers:
        session.bulk_save_objects(ledgers)
        session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
        session.execute('TRUNCATE ledger')

这对我来说似乎很老套和邪恶,但似乎工作正常。有什么陷阱或更好的方法吗?

-马特

【问题讨论】:

    标签: mysql performance orm sqlalchemy flask-sqlalchemy


    【解决方案1】:

    您实际上是在绕过 ORM 以优化性能。因此,不要对您“复制 ORM 正在做的工作”感到惊讶,因为这正是您需要做的。

    除非你有很多地方需要像这样进行批量更新,否则我建议不要使用神奇的事件方法;简单地编写显式查询要简单得多。

    我建议使用 SQLAlchemy Core 而不是 ORM 进行更新:

    ledger = Table("ledger", db.metadata,
        Column("wallet_id", Integer, primary_key=True),
        Column("new_balance", Float),
        prefixes=["TEMPORARY"],
    )
    
    
    wallets = db_session.query(Wallet).all()
    
    # figure out new balances
    balance_map = {}
    for w in wallets:
        balance_map[w.id] = calculate_new_balance(w)
    
    # create temp table with balances we need to update
    ledger.create(bind=db.session.get_bind())
    
    # insert update data
    db.session.execute(ledger.insert().values([{"wallet_id": k, "new_balance": v}
                                               for k, v in balance_map.items()])
    
    # perform update
    db.session.execute(Wallet.__table__
                             .update()
                             .values(balance=ledger.c.new_balance)
                             .where(Wallet.__table__.c.id == ledger.c.wallet_id))
    
    # drop temp table
    ledger.drop(bind=db.session.get_bind())
    
    # commit changes
    db.session.commit()
    

    【讨论】:

    • 是的,我觉得我在这里与 ORM 的斗争有点过头了。问题是我已经在大多数情况下使用 ORM,因此需要进行相当多的重写才能以这样一种方式获得它,以便我可以执行上述形式的某些操作。在我的用例中calculate_new_balance() 将取决于先前计算的值(此用例是通过网络传播资金),因此我最终将不得不同时查看模型和分类帐以确定哪个有正确的值。但是谢谢,这确实让我深思!
    【解决方案2】:

    通常需要频繁更新数千行是糟糕的架构设计。不说了……

    计划 A:编写生成的 ORM 代码

    START TRANSACTION;
    UPDATE wallet SET balance = ... WHERE id = ...;
    UPDATE wallet SET balance = ... WHERE id = ...;
    UPDATE wallet SET balance = ... WHERE id = ...;
    ...
    COMMIT;
    

    Plan B:编写生成的 ORM 代码

    CREATE TEMPORARY TABLE ToDo (
        id ...,
        new_balance ...
    );
    INSERT INTO ToDo -- either one row at a time, or a bulk insert
    UPDATE wallet
        JOIN ToDo USING(id)
        SET wallet.balance = ToDo.new_balance;  -- bulk update
    

    (检查语法;测试等)

    【讨论】:

    • 谢谢 :) 我知道这可能是糟糕的架构设计,但事实上我需要在该程序的每次迭代中以某种方式更新 1000 行。 A计划是我最初的计划,但太慢了。计划 B 现在是我最终拥有的,基于上面 univerio 的代码和before_flush 侦听器的组合。好消息是禁用监听器并且我有默认行为(计划 A)并启用它我得到优化更新(计划 B)
    • 关于 B 计划(您的 Ledger)的注意事项——如果有人可以在更新完成和您的 TRUNATE 之间写信给 Ledger,您可能会丢失一个条目。如果您可以将您的代码翻译成 SQL,我可以详细说明。
    猜你喜欢
    • 1970-01-01
    • 2011-11-06
    • 2019-08-17
    • 2013-05-19
    • 2016-11-29
    • 2013-10-14
    • 2015-05-20
    • 2014-02-17
    • 1970-01-01
    相关资源
    最近更新 更多