【问题标题】:Communication between multiple processes with zmq使用 zmq 在多个进程之间进行通信
【发布时间】:2023-03-07 21:52:01
【问题描述】:

我有 n 个拥有自己的本地数据和操作的进程,我希望每个进程都将其本地数据的“快照”发送到其余正在运行的节点进程。
到目前为止,我的代码如下所示:
def node1():
    Process(target=sync_1).start()
    sleep(4)
    data = {'node': 1, 'data': 'node 1 data'}

    context_b = zmq.Context()
    socket_b = context_b.socket(zmq.PUB)

    connnected = False
    try:
        socket_b.bind("tcp://*:%s" % 5560)
        connnected = True
    except Exception as e:
        print(e)
    if connnected:
        topic = "101"
        try:
            socket_b.send_string(topic + ' ' + json.dumps(data))
        except Exception as e:
                print(e)
    socket_b.close()
    context_b.term()

def node2():
    Process(target=sync_2).start()

def sync_1():
    context_c = zmq.Context()
    socket_c = context_c.socket(zmq.SUB)
    _port = 5560
    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')

     try:
         raw = socket_c.recv().decode("utf-8")
         json0 = raw.find('{')
         topic = raw[0:json0].strip()
         msg = json.loads(raw[json0:])
         print("[SYNC 1] received {}-{}]".format(topic, msg))
     except Exception as e:
         print(e)

def sync_2():
    context_c = zmq.Context()
    socket_c = context_c.socket(zmq.SUB)
    _port = 5560

    try:
        socket_c.connect("tcp://localhost:%s" % _port)
    except Exception as e:
        print(e)

    topicfilter = "101"
    socket_c.setsockopt_string(zmq.SUBSCRIBE, topicfilter, encoding='utf-8')

    try:
        raw = socket_c.recv().decode("utf-8")
        json0 = raw.find('{')
        topic = raw[0:json0].strip()
        msg = json.loads(raw[json0:])
        print("[SYNC 2] received {}-{}]".format(topic, msg))
    except Exception as e:
        print(e)

if __name__ == '__main__':
    Process(target=node1).start()
    Process(target=node2).start()

每个节点都有一个在后台运行的“侦听器”进程(同步功能),以便接收每个节点的数据并相应地使用它,并且当所有子套接字都连接到一个节点时它工作正常(节点1在这种情况)但我希望每个节点都向所有侦听器发送数据,所以我不确定如何实现,因为侦听器进程可以连接到一个端口。

另外,每次有更新时,节点都必须发送本地数据快照,所以这不可能是一次性的通信,因此我想到让侦听器进程一直主动等待更新。

我相信图表可能对解决这个问题有用:

有一种更简单的方法可以解决此问题,因此我们将不胜感激!

【问题讨论】:

    标签: python-multiprocessing zeromq publish-subscribe pyzmq python-sockets


    【解决方案1】:

    更新: 解决方案是使用 XPUB-XSUB 模式。
    通过使用这种模式,我创建了一个代理线程,让我可以做我想做的事。
    我能找到的最有用的 Python 示例是 this

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-01-09
      • 2011-10-16
      • 1970-01-01
      • 2015-08-04
      • 2013-04-28
      • 1970-01-01
      相关资源
      最近更新 更多