【问题标题】:across process boundary in scoped_session在 scoped_session 中跨进程边界
【发布时间】:2017-11-24 11:09:34
【问题描述】:

我正在使用 SQLAlchemy 和多处理。我也使用 scoped_session 避免共享同一个会话,但我发现了一个错误及其解决方案,但我不明白为什么会发生这种情况。

你可以在下面看到我的代码:

db.py

engine = create_engine(connection_string)

Session = sessionmaker(bind=engine)
DBSession = scoped_session(Session)

script.py

from multiprocessing import Pool, current_process
from db import DBSession

def process_feed(test):
    session = DBSession()
    print(current_process().name, session)

def run():
    session = DBSession()
    pool = Pool()
    print(current_process().name, session)
    pool.map_async(process_feed, [1, 2]).get()

if __name__ == "__main__":
    run()

当我运行script.py 时,输出为:

MainProcess <sqlalchemy.orm.session.Session object at 0xb707b14c>
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb707b14c>
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb707b14c>

请注意,会话对象在主进程及其工作人员(子进程)中是相同的0xb707b14c

但是如果我改变前两行 run() 的顺序:

def run():
    pool = Pool() # <--- Now pool is instanced in the first line
    session = DBSession()  # <--- Now session is instanced in the second line
    print(current_process().name, session)
    pool.map_async(process_feed, [1, 2]).get()

我再次运行script.py,输出为:

MainProcess <sqlalchemy.orm.session.Session object at 0xb66907cc>
ForkPoolWorker-1 <sqlalchemy.orm.session.Session object at 0xb669046c>
ForkPoolWorker-2 <sqlalchemy.orm.session.Session object at 0xb66905ec>

现在会话实例不同了。

【问题讨论】:

    标签: python session sqlalchemy python-multiprocessing


    【解决方案1】:

    要了解为什么会发生这种情况,您需要了解 scoped_sessionPool 的实际作用。 scoped_session 保留会话注册表,以便发生以下情况

    • 第一次调用 DBSession 时,它会在注册表中为您创建一个 Session 对象
    • 随后,如果满足必要条件(即同一线程,会话尚未关闭),它不会创建新的Session 对象,而是将之前创建的Session 对象返回给您

    当您创建 Pool 时,它会在 __init__ 方法中创建工作人员。 (请注意,在__init__ 中启动工作进程没有任何基本意义。同样有效的实现可以等到第一次需要工作人员后再启动它们,这将在您的示例中表现出不同的行为。)当这种情况发生时(在 Unix 上),父进程 forks 每个工作进程本身,这涉及操作系统将当前运行进程的内存复制到一个新进程中,因此您将在完全相同的位置获得完全相同的对象。

    将这两者放在一起,在第一个示例中,您在分叉之前创建一个Session,它在创建Pool 期间被复制到所有工作进程,从而产生相同的标识,而在第二个示例中您将 Session 对象的创建延迟到工作进程启动后,导致身份不同。

    请务必注意,虽然 Session 对象共享相同的 id,但它们不是相同的对象,因为如果您更改任何关于 Session 的内容父进程,它们不会反映在子进程中。由于分叉,它们恰好都共享相同的内存地址。 然而,像连接这样的操作系统级资源是共享的,所以如果您在Pool() 之前在session 上运行过查询,则会在连接池并随后分叉到子进程中。如果您随后尝试在子进程中执行查询,您将遇到奇怪的错误,因为您的进程正在通过相同的确切连接相互干扰!

    以上内容对于 Windows 没有实际意义,因为 Windows 没有 fork()

    【讨论】:

    • 谢谢!你的解释很清楚。但我以为叉子在这里做的:pool.map_async(process_feed, [1, 2]).get()
    • @Overflow012 是的,这是一个同样有效的假设,因为文档没有以某种方式规定它,但是,作为实施细节,它发生在 Pool.__init__ 中的 here
    【解决方案2】:

    TCP 连接表示为文件描述符,它通常跨进程边界工作,这意味着这将导致代表两个或多个完全独立的 Python 解释器状态并发访问文件描述符。

    https://docs.sqlalchemy.org/en/13/core/pooling.html#using-connection-pools-with-multiprocessing

    【讨论】:

      猜你喜欢
      • 2011-06-09
      • 2012-12-26
      • 2012-10-14
      • 1970-01-01
      • 2012-06-02
      • 1970-01-01
      • 2015-09-06
      • 2013-12-12
      • 1970-01-01
      相关资源
      最近更新 更多