您可以通过组合多个事件来做到这一点。您需要使用的具体事件取决于您的特定应用程序,但基本思想是这样的:
- [
InstanceEvents.load] 加载实例时,记下该实例已加载且稍后未添加到会话的事实(如果实例已加载,我们只想保存初始状态)
- [
AttributeEvents.set/append/remove] 当一个属性改变时,记下它被改变的事实,如果有必要,它是从什么改变的(如果你不需要初始状态,前两个步骤是可选的)
- [
SessionEvents.before_flush] 发生刷新时,记下实际保存了哪些实例
- [
SessionEvents.before_commit] 在提交完成之前,记下实例的当前状态(因为在提交之后您可能无法再访问它)
- [
SessionEvents.after_commit] 提交完成后,触发自定义事件处理程序并清除您保存的实例
一个有趣的挑战是事件的顺序。如果您执行session.commit() 而不执行session.flush(),您会注意到before_commit 事件在before_flush 事件之前触发,这与您在session.commit() 之前执行session.flush() 的情况不同。解决方案是在您的before_commit 调用中调用session.flush() 以强制排序。这可能不是 100% 洁净的,但它在生产中对我有用。
这是一个(简单的)事件顺序图:
begin
load
(save initial state)
set attribute
...
flush
set attribute
...
flush
...
(save modified state)
commit
(fire off "object saved and changed" event)
完整示例
from itertools import chain
from weakref import WeakKeyDictionary, WeakSet
from sqlalchemy import Column, String, Integer, create_engine
from sqlalchemy import event
from sqlalchemy.orm import sessionmaker, object_session
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
engine = create_engine("sqlite://")
Session = sessionmaker(bind=engine)
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
birthday = Column(String)
@event.listens_for(User.birthday, "set", active_history=True)
def _record_initial_state(target, value, old, initiator):
session = object_session(target)
if session is None:
return
if target not in session.info.get("loaded_instances", set()):
return
initial_state = session.info.setdefault("initial_state", WeakKeyDictionary())
# this is where you save the entire object's state, not necessarily just the birthday attribute
initial_state.setdefault(target, old)
@event.listens_for(User, "load")
def _record_loaded_instances_on_load(target, context):
session = object_session(target)
loaded_instances = session.info.setdefault("loaded_instances", WeakSet())
loaded_instances.add(target)
@event.listens_for(Session, "before_flush")
def track_instances_before_flush(session, context, instances):
modified_instances = session.info.setdefault("modified_instances", WeakSet())
for obj in chain(session.new, session.dirty):
if session.is_modified(obj) and isinstance(obj, User):
modified_instances.add(obj)
@event.listens_for(Session, "before_commit")
def set_pending_changes_before_commit(session):
session.flush() # IMPORTANT
initial_state = session.info.get("initial_state", {})
modified_instances = session.info.get("modified_instances", set())
del session.info["modified_instances"]
pending_changes = session.info["pending_changes"] = []
for obj in modified_instances:
initial = initial_state.get(obj)
current = obj.birthday
pending_changes.append({
"initial": initial,
"current": current,
})
initial_state[obj] = current
@event.listens_for(Session, "after_commit")
def after_commit(session):
pending_changes = session.info.get("pending_changes", {})
del session.info["pending_changes"]
for changes in pending_changes:
print(changes) # this is where you would fire your custom event
loaded_instances = session.info["loaded_instances"] = WeakSet()
for v in session.identity_map.values():
if isinstance(v, User):
loaded_instances.add(v)
def main():
engine = create_engine("sqlite://", echo=False)
Base.metadata.create_all(bind=engine)
session = Session(bind=engine)
user = User(birthday="foo")
session.add(user)
user.birthday = "bar"
session.flush()
user.birthday = "baz"
session.commit() # prints: {"initial": None, "current": "baz"}
user.birthday = "foobar"
session.commit() # prints: {"initial": "baz", "current": "foobar"}
session.close()
if __name__ == "__main__":
main()
如您所见,它有点复杂,而且不太符合人体工程学。如果将它集成到 ORM 中会更好,但我也理解不这样做可能是有原因的。