【问题标题】:Using Python RLocks across multiple independent processes跨多个独立进程使用 Python RLock
【发布时间】:2016-10-07 20:51:18
【问题描述】:

我正在开发一个使用 Celery 来安排一些长期任务的 Django 项目。 Django 和 Celery 都在完全独立的进程中运行,并且需要一种方法来协调对数据库的访问。我想使用 Python 的 multiprocessing.RLock 类(或等效类),因为我需要锁是可重入的。

我的问题是,如何为单独的进程提供对 RLock 的访问权限?

我发现的两个最佳解决方案(posix_ipc modulefcntl)仅限于基于 Unix 的系统,我们希望避免局限于此。

是否有跨平台的方式来共享进程之间的锁,而无需共同的祖先进程?

【问题讨论】:

  • 这不是一个直接的答案,但除非您需要具有强顺序性的锁定,否则您可能需要查看诸如0MQ 之类的消息传递系统。一个出色的消息传递系统,几乎可以在任何东西上运行,并且可以绑定几乎任何语言。
  • +1 for 0MQ 以便在以各种语言编写的进程之间进行通信,并且延迟很大。我不太习惯 Celery 以及它可能已经涉及(或约束)的内容,但也许您也可以考虑使用 redis,围绕这种功能已经有一些 python 绑定(pypi.python.org/pypi/python-redis-lockgithub.com/glasslion/redlock,@ 987654327@等)
  • 你确实意识到这个“没有共同祖先进程”的要求意味着你不能使用multiprocessing,对吧?
  • 彼得,mgc,感谢您的建议!我需要仔细看看这些是否可行。 @路易斯,是的。这就是问题的症结所在。我正在寻找一种在没有父进程的情况下获得 RLock 行为的方法。
  • 使用 mgc 建议的 redis 锁,或者你也可以使用 django 缓存。

标签: python django celery python-multiprocessing


【解决方案1】:

我最终使用 RabbitMQ 作为一种创建分布式锁的方法。有关如何执行此操作的详细信息,请参阅 RabbitMQ 的博客:https://www.rabbitmq.com/blog/2014/02/19/distributed-semaphores-with-rabbitmq/

简而言之,您为锁创建一个 RabbitMQ 队列并向其发送一条消息。要获取锁,请在队列上运行basic_get(非阻塞)或basic_consume(阻塞)。这会从队列中删除消息,防止其他线程获取锁。一旦你的工作完成,发送一个否定的 ack 将导致 RabbitMQ 重新排队消息,允许下一个线程继续。

不幸的是,这不允许重入锁。

上面引用的链接提供了如何执行此操作的 Java 代码。弄清楚如何将其翻译成 Python/Pika 已经很烦人了,我想我应该在这里发布一些示例代码。

生成锁:

import pika

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
    channel = connection.channel()
    channel.queue_declare(queue="LockQueue")
    channel.basic_publish(exchange='', routing_key='LockQueue', body='Lock')
    channel.close()

获取锁:

import pika
import time

def callback(ch, method, properties, body):
    print("Got lock")

    for i in range(5, 0, -1):
        print("Tick {}".format(i))
        time.sleep(1)

    print("Releasing lock")
    ch.basic_nack(delivery_tag=method.delivery_tag)
    ch.close()  # Close the channel to continue on with normal processing. Without this, `callback` will continue to request the lock.

with pika.BlockingConnection(pika.ConnectionParameters('localhost')) as connection:
    channel = connection.channel()

    channel.queue_declare(queue='LockQueue')
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='LockQueue')

    print("Waiting for lock")
    channel.start_consuming()
    print("Task completed")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2012-07-20
    • 1970-01-01
    • 2015-12-20
    • 2011-11-03
    • 2014-03-07
    • 2020-12-29
    • 2011-08-22
    • 1970-01-01
    相关资源
    最近更新 更多