这是您问题第一部分的解决方案 - 如何并行运行进程,以便每个进程等待前一个进程完成以开始处理任务。我没有在这里解决消息传递方面的问题,因为这对我来说似乎有点模糊,并且可以根据问题陈述以不同的方式实现。在这个例子中,我们创建并运行了三个工人,它们通过简单的时间延迟来模拟执行。代码 sn-ps 应保存到可以从命令行运行的单个文件中。
我们首先导入所需的模块:
#!/usr/bin/env python3
import time
from multiprocessing import Process, Event
并实现WorkerQueue 类。这个类使工人保持正确的顺序,并负责启动和终止他们。工作人员之间的通信是使用事件来实现的。每个工人都有 other_ready 和 ready Event 字段,相应地指示先前工人和当前工人的完成状态。注意,如果队列中只有一个worker,它的other_ready和ready是一样的。
class WorkerQueue(object):
def __init__(self):
self._workers = []
def add_worker(self, worker):
if self._workers:
worker.other_ready = self._workers[-1].ready
self._workers[0].other_ready = worker.ready
else:
worker.other_ready = worker.ready
self._workers.append(worker)
def start_workers(self):
if not self._workers:
return
self._workers[0].other_ready.set()
for w in self._workers:
w.start()
def stop_workers(self):
for w in self._workers:
w.join()
然后,我们通过继承Process 类来实现worker 本身。注意,也可以使用threading 代替multiprocessing。在这种情况下,唯一改变的是 Worker 父类,Thread 而不是 Process。
class Worker(Process):
def __init__(self, delay, name=None):
super().__init__(name=name)
self.delay = delay
self.other_ready = Event()
self.other_ready.set()
self.ready = Event()
self.stop = Event()
def run(self):
while not self.stop.is_set():
try:
self.other_ready.wait()
t = time.strftime('%H:%M:%S')
print('Started:', self.name, t, flush=True)
time.sleep(self.delay)
t = time.strftime('%H:%M:%S')
print('Finished:', self.name, t, flush=True)
except:
break
self.other_ready.clear()
self.ready.set()
def join(self, timeout=None):
self.stop.set()
super().join(timeout)
在这里,您看到,每个工作人员在开始执行命令之前等待前一个工作人员准备好。默认情况下,设置了 other_ready,这样我们就不会在队列中有单个工作人员的情况下遇到死锁。
最后,我们实现了一个main 函数,在该函数中我们定义了worker,将它们添加到worker 队列中并启动它们。
def main():
first = Worker(delay=1, name='first')
second = Worker(delay=3, name='second')
third = Worker(delay=2, name='third')
queue = WorkerQueue()
for w in (first, second, third):
queue.add_worker(w)
queue.start_workers()
try:
# The main infinite loop, do something useful:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
queue.stop_workers()
不要忘记在文件末尾添加以下行:
if __name__ == '__main__':
main()
现在,可以将其保存到一个文件中,例如 proc_queue.py,您可以从命令行运行该文件以查看结果:
$ python3 proc_queue.py
Started: first 16:04:09
Finished: first 16:04:10
Started: second 16:04:10
Finished: second 16:04:13
Started: third 16:04:13
Finished: third 16:04:15
Started: first 16:04:15
Finished: first 16:04:16
Started: second 16:04:16
Finished: second 16:04:19
Started: third 16:04:19
Finished: third 16:04:21
^C
这可能有点过于复杂,但这是我能想到的唯一解决方案。如果您知道更好的方法,我很乐意了解它:)