【发布时间】:2025-11-30 06:45:01
【问题描述】:
这是我遇到的问题:我使用的是 Python 2.7,并且我有一个在线程中运行的代码,该线程具有一个关键区域,当时只有一个线程应该执行。该代码目前没有互斥机制,所以我想询问我可以为我的特定用例使用什么,这涉及“删除”“排队”函数。我尝试使用以下最小工作示例来模拟这种行为:
useThreading=False # True
if useThreading: from threading import Thread, Lock
else: from multiprocessing import Process, Lock
mymutex = Lock()
import time
tstart = None
def processData(data):
#~ mymutex.acquire()
try:
print('thread {0} [{1:.5f}] Do some stuff'.format(data, time.time()-tstart))
time.sleep(0.5)
print('thread {0} [{1:.5f}] 1000'.format(data, time.time()-tstart))
time.sleep(0.5)
print('thread {0} [{1:.5f}] done'.format(data, time.time()-tstart))
finally:
#~ mymutex.release()
pass
# main:
tstart = time.time()
for ix in xrange(0,3):
if useThreading: t = Thread(target = processData, args = (ix,))
else: t = Process(target = processData, args = (ix,))
t.start()
time.sleep(0.001)
现在,如果你运行这段代码,你会得到这样的打印输出:
thread 0 [0.00173] Do some stuff
thread 1 [0.00403] Do some stuff
thread 2 [0.00642] Do some stuff
thread 0 [0.50261] 1000
thread 1 [0.50487] 1000
thread 2 [0.50728] 1000
thread 0 [1.00330] done
thread 1 [1.00556] done
thread 2 [1.00793] done
也就是说,三个线程很快就会一个接一个地“排队”(大约是 2-3 毫秒之后)。实际上,它们没有排队,它们只是在彼此之后 2-3 毫秒后开始并行执行。
现在,如果我启用 mymutex.acquire()/.release() 命令,我会得到预期的结果:
thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
thread 1 [1.00350] Do some stuff
thread 1 [1.50462] 1000
thread 1 [2.00531] done
thread 2 [2.00547] Do some stuff
thread 2 [2.50638] 1000
thread 2 [3.00706] done
基本上,现在有了锁定,线程不会并行运行,而是由于锁定,它们一个接一个地运行——只要一个线程在工作,其他线程就会阻塞在.acquire()。但这也不是我想要实现的。
我想要实现的是:假设当.acquire() 首次由线程函数触发时,它会在队列中注册一个函数的 id(比如指向它的指针)。之后,行为与 Lock 基本相同 - 当一个线程工作时,其他线程在 .acquire() 处阻塞。当第一个线程完成后,它进入finally: 块——在这里,我想查看队列中有多少线程在等待;然后我想删除/删除最后一个线程的所有等待线程除了 - 最后,我要.release()锁;这意味着在此之后,队列中的最后一个线程将在接下来执行。我想,我想写类似下面的伪代码:
...
finally:
if (len(mymutex.queue) > 2): # more than this instance plus one other waiting:
while (len(mymutex.queue) > 2):
mymutex.queue.pop(1) # leave alone [0]=this instance, remove next element
# at this point, there should be only queue[0]=this instance, and queue[1]= what was the last thread queued previously
mymutex.release() # once we releace, queue[0] should be gone, and the next in the queue should acquire the mutex/lock..
pass
...
这样,我希望得到这样的打印输出:
thread 0 [0.00174] Do some stuff
thread 0 [0.50263] 1000
thread 0 [1.00327] done
# here upon lock release, thread 1 would be deleted - and the last one in the queue, thread 2, would acquire the lock next:
thread 2 [1.00350] Do some stuff
thread 2 [1.50462] 1000
thread 2 [2.00531] done
在 Python 中实现此目的最直接的方法是什么?
【问题讨论】:
-
我遇到了同样的问题,为了解决这个问题,我使用redis 来同步线程并让最后一个或第一个线程执行一些关键代码,在你的情况下它可以作为一个队列,为此您需要知道每个线程的线程数,然后将其发布到 redis。
-
感谢@KobiK - 以前从未听说过
redis;我正在查看redis.io/commands 和redis.io/clients,它看起来并不容易,至少就在这种情况下使用它而言。我会尝试寻找某种教程...再次感谢 - 干杯! -
用
redis.lock实现一个Lock对象并创建一个用于关键代码的包装器并不难,在实现时在每个锁定请求中使用publish,并检查数字如果它是你的最大线程数,那么你可以使用队列选项释放它。 -
感谢@KobiK 的指点 - 正是我需要缩小搜索范围
:)干杯!
标签: python multithreading mutex critical-section