【问题标题】:ZMQ client-worker communication patternZMQ 客户端-工作者通信模式
【发布时间】:2015-04-23 17:48:43
【问题描述】:

阅读 ZeroMQ 文档后,当我发现这三个套接字组合时,我有点迷茫。它们是:

  • 经销商到路由器
  • 经销商到经销商
  • 路由器到路由器

我了解 DEALER 和 ROUTER 是同步 REQ/REP 通信的替代品,因此它们变为异步并且可以连接多个节点。我不明白的是,DEALER 如何可以替代 DEALER 到 DEALER 中的 REQ 和 REP(以及 ROUTER 到 ROUTER 中的路由器)。

我一直在寻找一种模式,该模式允许任意数量的客户端将作业提交给任意数量的工作人员处理(通过负载平衡)这些工作并将响应(和中间结果)返回给客户端(异步,但发送多条消息背部)。客户可能还需要能够提前终止工作。我觉得documentation 在这方面有点轻(我不是专家,可能错过了相关部分)。

我很乐意自己解决细节,但每次我认为我找到了合适的模式时,我都会发现另一个可能同样合适的模式(例如这三种模式在我看来同样合适:http://zguide.zeromq.org/page:all#ROUTER-Broker-and-REQ-Workers , http://zguide.zeromq.org/page:all#ROUTER-Broker-and-DEALER-Workers, http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker)。

任何关于结构(哪个套接字与哪个组件通信)的建议都值得赞赏。

更新

这是我目前想出的:

import multiprocessing
import zmq
import time

router_url_b = 'tcp://*:5560'
router_url = 'tcp://localhost:5560'

dealer_url_b = 'tcp://*:5561'
dealer_url = 'tcp://localhost:5561'


def broker():
    context = zmq.Context()
    router = context.socket(zmq.ROUTER)
    router.bind(router_url_b)

    dealer = context.socket(zmq.DEALER)
    dealer.bind(dealer_url_b)

    poll = zmq.Poller()
    poll.register(router, zmq.POLLIN)
    poll.register(dealer, zmq.POLLIN)

    while True:
        poll_result = dict(poll.poll())
        if router in poll_result:
            ident, msg = router.recv_multipart()
            print 'router: ident=%s, msg=%s' % (ident, msg)
            # print 'router received "%s" and ident %s' % (msg, ident)
            dealer.send_multipart([ident, msg])
            # dealer.send(msg)
        if dealer in poll_result:
            ident, msg = dealer.recv_multipart()
            print 'dealer: ident=%s, msg=%s' % (ident, msg)
            router.send_multipart([ident, msg])


def client(client_id):
    context = zmq.Context()
    req = context.socket(zmq.DEALER)
    # setting identity doesn't seem to make a difference
    req.setsockopt(zmq.IDENTITY, b"%s" % client_id)
    req.connect(router_url)

    req.send('work %d' % client_id)
    while True:
        msg = req.recv()
        print 'client %d received response: %s' % (client_id, msg)


def worker(worker_id):
    context = zmq.Context()
    # to allow asynchronous sending of responses.
    rep = context.socket(zmq.ROUTER)
    # not sure if this is required...
    # rep.setsockopt(zmq.IDENTITY, b"%s" % (10+worker_id))
    rep.connect(dealer_url)

    while True:
        msg = rep.recv_multipart()
        ident, msg = msg[:-1], msg[-1]
        print 'worker %d received: "%s", ident="%s"' % (worker_id, msg, ident)
        # do some work...
        time.sleep(10)
        rep.send_multipart(ident + ['result A from worker %d (%s)' % (worker_id, msg)])
        # do more work...
        time.sleep(10)
        rep.send_multipart(ident + ['result B from worker %d (%s)' % (worker_id, msg)])
    print 'finished worker', worker_id


def main():

    print 'creating workers'
    for i in xrange(2):
        p = multiprocessing.Process(target=worker, args=(i, ))
        p.daemon = True
        p.start()

    print 'creating clients'
    for i in xrange(5):
        p = multiprocessing.Process(target=client, args=(i, ))
        p.daemon = True
        p.start()

    broker()


if __name__ == '__main__':
    main()

它似乎工作得很好。唯一缺少的是一旦工人开始处理工作,从客户端到工人的通信。我想最好的办法是创建某种新的控制通道(发布/订阅),以便在需要时终止工作人员。

还有几个问题:

  • 这个模型有什么明显的弱点吗?
  • IDENTITY 有什么用处?我是否设置这些值似乎并不重要(无论是在客户端还是在工作人员中)。
  • 工人收到的第一条消息是: worker 1 received: "work 3", ident="['\x00\x80\x00A\xa7', '3']" worker 0 received: "work 4", ident="['\x00\x80\x00A\xa7', '4']" 为什么两个工人的第一个 ident 项目相同?我理解路由器工作的方式是分配它跟踪的随机身份。这是如何工作的(它似乎在一个小规模的例子中工作)?

【问题讨论】:

  • 你好 orange -- 两个问题:Q1: 你有没有花几天时间在 Pieter HINTJENS 的优秀而有益的书籍 Code Connected ,第 1 卷(不是关于代码 sn-ps,而是关于关于非阻塞、可扩展的正式通信模式的概念性思考——确实值得花费更长的时间,而不是几天)? Q2: 你确定你在 .connect()-s 之后的 .bind() 场景符合 Pieter 的设置建议吗?预期的消息传递环境?让我们更新,谢谢。

标签: sockets messaging zeromq


【解决方案1】:

除了我的更新,我发现worker可以使用DEALER连接到服务器的后端。模式和解释可以在here找到。

客户端使用 DEALER 套接字,服务器作为 ROUTER 在前端接收请求(asyn + 许多客户端),使用 DEALER 套接字(asyn)将它们代理给工作人员(后端),工作人员监听服务器的后端在 DEALER 套接字上(异步,不需要路由,尽管 ROUTER 也可以工作)。

如果工人是严格同步的,我们会使用 REP,但是因为我们 想要发送多个回复,我们需要一个异步套接字。我们不想 路由回复;他们总是去发送的单个服务器线程 我们的请求。

进一步的修改是用zmq.proxy(router, dealer)broker() 中的while True 循环)替换路由器/经销商消息的隐式调度。

更新

显然,这种模式使用了 ZMQ 的标准循环路由。自定义任务分配可以通过 ROUTER 到 ROUTER 模式来实现。在这种情况下,客户端从发送请求开始,工作人员从发送就绪消息开始。代理管理一个就绪工作人员列表,如果没有可用的工作人员,则关闭轮询新客户端消息(因此使用 ZMQ 的内部消息缓冲区)。

【讨论】:

  • 在您的实现中,您是否需要分别在客户端和工作人员上使用 ZMQ_REQZMQ_REP?或者只是DEALER-to-ROUTER
  • REQ 和 REP 用于一对一的阻塞通信模式,此处未使用。
猜你喜欢
  • 1970-01-01
  • 2013-05-18
  • 1970-01-01
  • 1970-01-01
  • 2021-01-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多