【发布时间】:2017-06-11 17:44:15
【问题描述】:
我目前正在使用 SQLAlchemy(在 GAE 上,连接到 Google 的云 MySQL)编写一个 Web 应用程序 (Flask),并且需要对表进行批量更新。简而言之,完成了许多计算,导致需要在 1000 个对象上更新单个值。目前我正在事务中完成所有操作,但最终,刷新/提交需要很长时间。
该表在id 上有一个索引,所有这些都在单个事务中执行。所以我相信我已经避免了常见的错误,但仍然很慢。
INFO 2017-01-26 00:45:46,412 log.py:109] UPDATE wallet SET balance=%(balance)s WHERE wallet.id = %(wallet_id)s
2017-01-26 00:45:46,418 INFO sqlalchemy.engine.base.Engine ({'wallet_id': u'3c291a05-e2ed-11e6-9b55-19626d8c7624', 'balance': 1.8711760000000002}, {'wallet_id': u'3c352035-e2ed-11e6-a64c-19626d8c7624', 'balance': 1.5875759999999999}, {'wallet_id': u'3c52c047-e2ed-11e6-a903-19626d8c7624', 'balance': 1.441656}
据我了解,实际上没有办法在 SQL 中进行批量更新,上面的语句最终是多个 UPDATE 语句被发送到服务器。
我尝试过使用Session.bulk_update_mappings(),但这似乎并没有真正做任何事情:(不知道为什么,但更新从未真正发生过。我看不到实际使用此方法的任何示例(包括性能套件)所以不确定它是否打算使用。
One technique I've seen discussed 正在对另一个表进行批量插入,然后进行 UPDATE JOIN。我已经对其进行了测试,如下所示,它似乎要快得多。
wallets = db_session.query(Wallet).all()
ledgers = [ Ledger(id=w.id, amount=w._balance) for w in wallets ]
db_session.bulk_save_objects(ledgers)
db_session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
db_session.execute('TRUNCATE ledger')
但现在的问题是如何构建我的代码。我正在使用 ORM,我需要以某种方式不“弄脏”原始的 Wallet 对象,以免它们以旧方式提交。我可以只创建这些Ledger 对象并保留它们的列表,然后在批量操作结束时手动插入它们。但这几乎闻起来像是在复制 ORM 机制的一些工作。
有没有更聪明的方法来做到这一点?到目前为止,我的大脑正在下降,例如:
class Wallet(Base):
...
_balance = Column(Float)
...
@property
def balance(self):
# first check if we have a ledger of the same id
# and return the amount in that, otherwise...
return self._balance
@balance.setter
def balance(self, amount):
l = Ledger(id=self.id, amount=amount)
# add l to a list somewhere then process later
# At the end of the transaction, do a bulk insert of Ledgers
# and then do an UPDATE JOIN and TRUNCATE
正如我所说,这一切似乎都在与我(可能)拥有的工具作斗争。有没有更好的方法来处理这个?我可以利用 ORM 机制来执行此操作吗?还是有更好的方法来进行批量更新?
编辑:或者活动和会议可能有什么巧妙之处?也许 before_flush?
编辑 2:所以我尝试利用事件机制,现在有了这个:
@event.listens_for(SignallingSession, 'before_flush')
def before_flush(session, flush_context, instances):
ledgers = []
if session.dirty:
for elem in session.dirty:
if ( session.is_modified(elem, include_collections=False) ):
if isinstance(elem, Wallet):
session.expunge(elem)
ledgers.append(Ledger(id=elem.id, amount=elem.balance))
if ledgers:
session.bulk_save_objects(ledgers)
session.execute('UPDATE wallet w JOIN ledger l on w.id = l.id SET w.balance = l.amount')
session.execute('TRUNCATE ledger')
这对我来说似乎很老套和邪恶,但似乎工作正常。有什么陷阱或更好的方法吗?
-马特
【问题讨论】:
标签: mysql performance orm sqlalchemy flask-sqlalchemy