【问题标题】:SQLAlchemy - Multithreaded Persistent Object Creation, how to merge back into single session to avoid state conflict?SQLAlchemy - 多线程持久对象创建,如何合并回单个会话以避免状态冲突?
【发布时间】:2014-11-27 19:04:17
【问题描述】:

由于需要处理,我想要以多线程方式生成数以万计(可能是数百)数千个持久对象。

虽然对象的创建发生在单独的线程中(使用 Flask-SQLAlchemy 扩展 btw 和作用域会话),但将生成的对象写入数据库的调用发生在生成完成后的 1 个位置。

我认为,问题在于正在创建的对象是几个现有关系的一部分——从而触发了身份映射的自动添加,尽管它们是在单独的并发线程中创建的,并且在任何线程中都没有显式会话.

我希望将生成的对象包含在一个列表中,然后将整个列表(使用单个会话对象)写入数据库。这会导致如下错误:

AssertionError: A conflicting state is already present in the identity map for key (<class 'app.ModelObject'>, (1L,))

因此我相信身份映射已经被填充,因为当我尝试使用并发代码的外部全局会话添加和提交时,会触发断言错误。

最后的细节是任何会话对象(范围或其他,因为我不完全理解在多线程情况下自动添加到身份映射的工作原理)我找不到方法/不要知道如何获得对它们的引用,这样即使我想为每个进程处理一个单独的会话,我也可以。

非常感谢任何建议。我(还)没有发布代码的唯一原因是因为很难从我的应用程序中立即抽象出一个工作示例。如果有人真的需要看到它,我会发布。

【问题讨论】:

  • 我对 flask-sqlalchemy 的理解是它不是基于线程本地而是基于每个请求分配会话。但是我真的不明白您是如何在... Web 请求的上下文中运行多线程任务的?这完全取决于您的会话创建/使用模式,这里不清楚。
  • 让我澄清一下。此特定代码是从 Flask 请求上下文之外的 CLI 运行的。我担心的是模型类仍在使用由 Flask-SQLAlchemy 创建的声明性基础,并且由于生成的对象是子对象,因此在创建它们并与其父对象关联时它们会自动添加到身份映射中(已经持久性)CLI 脚本从未传递任何显式会话引用,这就是为什么我不确定如何处理正在发生的冲突。我只能假设每个线程都在动态创建一个。
  • 大约一个小时后我将抽象并上传一些示例代码
  • 您好,感谢您对这个问题的关注。最终,是你的文档的彻底性帮助我想出了一个解决方案:)
  • 分离状态假设实际上是我想在这里澄清的缺失的细节。因为对象与已经持久化(在活动会话中)的父对象相关联,所以子对象(在子进程中)是真正分离的,还是因为关联而暂时/挂起?

标签: python sqlalchemy flask-sqlalchemy


【解决方案1】:

每个会话都是线程本地的;换句话说,每个线程都有一个单独的会话。如果您决定将某些实例传递给另一个线程,它们将与会话“分离”。在接收线程中使用db.session.add_all(objects) 将它们全部放回。

由于某种原因,您似乎正在不同线程中创建具有相同标识(主键列)的对象,然后尝试将它们都发送到数据库。一种选择是解决发生这种情况的原因,从而保证身份是唯一的。你也可以试试mergingmerged_object = db.session.merge(other_object, load=False).

编辑:zzzeek 的评论让我了解了可能发生的其他事情:

使用 Flask-SQLAlchemy,会话与应用上下文相关联。由于这是线程本地的,因此产生一个新线程将使上下文无效;线程中不会有数据库会话。所有实例都在那里分离,并且无法正确跟踪关系。一种解决方案是将app 传递给每个线程并在with app.app_context(): 块内执行所有操作。在块内,首先使用db.session.add 用传递的实例填充本地会话。之后您仍应合并到主任务中以确保一致性。

【讨论】:

    【解决方案2】:

    我只是想用一些伪代码来澄清问题和解决方案,以防将来有人遇到这个问题/想要这样做。

    class ObjA(object):
        obj_c = relationship('ObjC', backref='obj_c')
    
    class ObjB(object):
        obj_c = relationship('ObjC', backref='obj_c')
    
    class ObjC(object):
        obj_a_id = Column(Integer, ForeignKey('obj_a.id'))
        obj_b_id = Column(Integer, ForeignKey('obj_b.id'))
    
        def __init__(self, obj_a, obj_b):
            self.obj_a = obj_a
            self.obj_b = obj_b
    
    
    def make_a_bunch_of_c(obj_a, list_of_b=None):
        return [ObjC(obj_a, obj_b) for obj_b in list_of_b]
    
    def parallel_generate():
       list_of_a = session.query(ObjA).all() # assume there are 1000 of these
       list_of_b = session.query(ObjB).all() # and 30 of these
    
       fxn = functools.partial(make_a_bunch_of_c, list_of_b=list_of_b)
       pool = multiprocessing.Pool(10)
       all_the_things = pool.map(fxn, list_of_a)
       return all_the_things
    

    现在让我们停在这里一秒钟。最初的问题是尝试添加 ObjC 的列表导致原始问题中的错误消息:

    session.add_all(all_the_things)
    
    AssertionError: A conflicting state is already present in the identity map for key [...]
    

    注意:错误发生在添加阶段,提交尝试甚至从未发生,因为断言发生在提交前。据我所知。

    解决方案:

    all_the_things = parallel_generate()
    for thing in all_the_things:
        session.merge(thing)
    session.commit()
    

    在处理自动添加的对象(通过关系级联)时会话利用率的细节仍然超出我的能力范围,我无法解释最初发生冲突的原因。我所知道的是,使用合并函数将导致 SQLAlchemy 将跨 10 个不同进程创建的所有子对象排序到主进程中的单个会话中。

    如果有人遇到这种情况,我会很好奇为什么。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-11-27
      • 1970-01-01
      • 1970-01-01
      • 2011-02-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多