【发布时间】:2018-03-24 09:18:28
【问题描述】:
我正在尝试在 multiprocessing.Process 的领域内使用 pyzmq PUB/SUB 套接字原型:
我有一个订阅者:
import time
import collections
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b"")
socket.connect("tcp://localhost:5000")
nb_recv = 0
begin = time.time()
counter = collections.defaultdict(int)
while True:
msg = socket.recv_json()
print(msg)
还有两种不同的发布者实现。
有了这个,订阅者接收消息:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
pass
def run(self):
self._socket = self._context.socket(zmq.PUB)
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
但是有了这个(唯一的区别是 socket 的创建是在构造函数中,而不是在 run() 类中-方法),订阅者没有收到任何消息:
import zmq
from multiprocessing import Process
class Sender(object):
def __init__(self):
self._context = zmq.Context()
self._socket = self._context.socket(zmq.PUB) # <---------
pass
def run(self):
self._socket.bind("tcp://127.0.0.1:5000")
seq_num = 0
while True:
msg = { "sequence": seq_num }
self._socket.send_json(msg)
seq_num += 1
if __name__ == "__main__":
s = Sender()
p = Process(target=s.run)
p.start()
p.join()
当我用 threading.Thread 替换 multiprocessing.Process 时,两个类都运行良好,但我没有在文档中找到任何解释。
【问题讨论】:
标签: python python-3.x multiprocessing zeromq pyzmq