【问题标题】:How to ensure that a block of code can be executed only by one request at a time in Python?如何确保在 Python 中一次只能由一个请求执行一段代码?
【发布时间】:2021-07-23 21:50:26
【问题描述】:

我可能遇到了一个典型的比赛条件,需要建议如何解决这个问题。

VpnProfileTable = sqlalchemy.Table(
    "vpn_profile",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("profile", Text(), nullable=False),
    Column("server_id", ForeignKey("server.id"), nullable=False, index=True),
    Column("device_id", ForeignKey("device.id"), nullable=True, index=True),
)

我有一个可以一次分配给设备的 VPN 配置文件表。一个配置文件不应分配给多个设备。

  1. 通过此处的查询,我确保只返回没有设备 ID 的未使用配置文件。
  2. 然后我获取所有配置文件并随机选择一个。然后我更新数据库表以指示所选配置文件现在已分配给设备,不应提供给其他人。

但是,我相信在获取和更新记录之间会发生竞争情况,有时我最终会让两个用户获得相同的个人资料。

async def get_next_vpn_profile(
    self, server_id: str, device_id: str
) -> Optional[str]:
    query = (
        VpnProfileTable.select()
        .where(VpnProfileTable.c.server_id == server_id)
        .where(VpnProfileTable.c.device_id == None)
    )
    async with engine.begin() as conn:
            records = (await conn.execute(query)).fetchall()
            profiles = []
            if records and len(records) > 0:
                profiles = [VpnProfile.parse_obj(i) for i in records]
            if profiles:
                profile: VpnProfile = random.choice(profiles)
                query = (
                    VpnProfileTable.update()
                    .where(VpnProfileTable.c.id == profile.id)
                    .values(device_id=device_id)
                )
                await conn.execute(query)
                return profile.profile
            else:
                return None

我该怎么做才能使这段代码在所有传入请求中只能提供给一个请求以避免这种情况? (应用程序在 Gunicorn/Uvicorn 上运行)?或者,还有更好的方法?我正在考虑单例/信号量,但无法理解它。

更新

正如 Marat 在 cmets 中提到的,我认为最好的方法是在数据库级别使用锁定。我正在使用 Postgres,所以我不确定在 with_for_update() 中是否需要 nowait=True

        async with engine.begin() as conn:
            query = (
                VpnProfileTable.select()
                .where(
                    VpnProfileTable.c.device_id == None,
                )
                .with_for_update()
            )
            record = (await conn.execute(query)).first()
            if record:
                query = (
                    VpnProfileTable.update()
                    .where(VpnProfileTable.c.id == record.id)
                    .values(device_id=device_id)
                )
                await conn.execute(query)
                await conn.commit()

总而言之,我相信这会获得第一个没有任何 device_id 的可用 vpn 配置文件,将其锁定,以便其他进程在这里等待,直到可以再次读取该行。

然后在同一个事务中,我将获得的 vpn 配置文件设置为给定的 device_id 并提交更改。 (不确定我是否需要提交,是否已经有 with engine.begin() 声明。它应该自动发生。

我想不出一种方法来为这种情况编写单元测试,所以我希望有人可以验证这一点。 .with_for_update() 是否足以让其他进程在尝试运行相同的 select 语句时等待?

因为如果他们等待,他们将不会得到相同的行,因为它已经分配给另一个 device_id,这正是我需要的。

【问题讨论】:

  • 是否总是有一个进程执行该操作?除了任何本地独占代码执行之外,还可以考虑利用悲观或乐观的数据库并发性。
  • 您应该使用 select 进行更新操作,锁定所需的行。线程的答案将不起作用,因为 prod 中的任何 python 应用程序都在多个进程上运行。线程锁是每个进程的。
  • @user2864740 不,涉及多个工人。利用 Gunicorn 和 Uvicorn 同时并行运行应用程序。另外我正在使用 SqlAlchemy Core,不涉及任何会话。
  • @Houman 我认为你不应该跳过锁定,等待锁定。代码看起来没问题并进行了锁定。

标签: python python-asyncio fastapi


【解决方案1】:

将您的代码封装在互斥锁中。在 Python 中,这可以使用 multiprocessing.Lock 来完成;例如:

from multiprocessing import Lock
mutex = Lock()

async def get_next_vpn_profile(
    self, server_id: str, device_id: str
) -> Optional[str]:
    ...

    with mutex:
        async with engine.begin() as conn:
        ...

如果使用多个进程,通常互斥锁不起作用,但this answer 澄清说,Gunicorn 工作人员可以在使用来自multiprocessing 的锁时共享锁。

【讨论】:

  • 最好在finally: 块中释放,这样即使抛出异常也会释放。
  • @CharlesDuffy 好点,更新了我的答案
  • 这仅在访问被保护在单个进程中时才足够。即使目前情况如此,也值得一提。
  • 锁也是上下文管理器,因此您可以使用with mutex: ... 来简化此操作。
  • 看起来确实可能是这种情况:stackoverflow.com/questions/18213619 我将编辑我的答案以改用multiprocessing.Lock,但也许这个问题应该标记为重复。
猜你喜欢
  • 2019-05-11
  • 1970-01-01
  • 2023-02-11
  • 1970-01-01
  • 1970-01-01
  • 2015-06-16
  • 2015-08-13
  • 1970-01-01
  • 2011-09-01
相关资源
最近更新 更多