【发布时间】:2019-05-17 19:17:03
【问题描述】:
我有一个进程A不断发布消息,进程B和C订阅主题,获取进程A中发布者发布的最新消息。
所以,我将zmq.CONFLATE 设置为发布者和订阅者。但是,我发现一个订阅者无法接收消息。
def publisher(sleep_time=1.0, port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.bind("tcp://*:%s" % port)
print ("Running publisher on port: ", port)
while True:
localtime = time.asctime( time.localtime(time.time()))
string = "Message published time: {}".format(localtime)
socket.send_string("{}".format(string))
time.sleep(sleep_time)
def subscriber(name="sub", sleep_time=1, ports="5556"):
print ("Subscriber Name: {}, Sleep Time: {}, Port: {}".format(name, sleep_time, ports))
context = zmq.Context()
print ("Connecting to publisher with ports %s" % ports)
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.CONFLATE, 1)
socket.setsockopt_string(zmq.SUBSCRIBE, "")
socket.connect ("tcp://localhost:%s" % ports)
while True:
message = socket.recv()
localtime = time.asctime( time.localtime(time.time()))
print ("\nSubscriber [{}]\n[RECV]: {} at [TIME]: {}".format(name, message, localtime))
time.sleep(sleep_time)
if __name__ == "__main__":
Process(target=publisher).start()
Process(target=subscriber, args=("SUB1", 1.2, )).start()
Process(target=subscriber, args=("SUB2", 1.1, )).start()
我尝试在发布者中取消设置socket.setsockopt(zmq.CONFLATE, 1),这似乎解决了问题。进程 B 和 C 中的两个订阅者都可以收到消息,并且这些消息似乎是最新的。
我试图找出为什么将发布者设置为 CONFLATE 会导致我遇到的问题。我找不到有关它的信息。有谁知道是什么导致了这种行为?
另外我想知道,在一个发布者对多个订阅者的情况下,正确的代码设置是什么,让订阅者总能得到最新的消息?
【问题讨论】:
标签: python zeromq python-multiprocessing pyzmq interprocess