【发布时间】:2015-04-23 17:48:43
【问题描述】:
阅读 ZeroMQ 文档后,当我发现这三个套接字组合时,我有点迷茫。它们是:
- 经销商到路由器
- 经销商到经销商
- 路由器到路由器
我了解 DEALER 和 ROUTER 是同步 REQ/REP 通信的替代品,因此它们变为异步并且可以连接多个节点。我不明白的是,DEALER 如何可以替代 DEALER 到 DEALER 中的 REQ 和 REP(以及 ROUTER 到 ROUTER 中的路由器)。
我一直在寻找一种模式,该模式允许任意数量的客户端将作业提交给任意数量的工作人员处理(通过负载平衡)这些工作并将响应(和中间结果)返回给客户端(异步,但发送多条消息背部)。客户可能还需要能够提前终止工作。我觉得documentation 在这方面有点轻(我不是专家,可能错过了相关部分)。
我很乐意自己解决细节,但每次我认为我找到了合适的模式时,我都会发现另一个可能同样合适的模式(例如这三种模式在我看来同样合适:http://zguide.zeromq.org/page:all#ROUTER-Broker-and-REQ-Workers , http://zguide.zeromq.org/page:all#ROUTER-Broker-and-DEALER-Workers, http://zguide.zeromq.org/page:all#A-Load-Balancing-Message-Broker)。
任何关于结构(哪个套接字与哪个组件通信)的建议都值得赞赏。
更新
这是我目前想出的:
import multiprocessing
import zmq
import time
router_url_b = 'tcp://*:5560'
router_url = 'tcp://localhost:5560'
dealer_url_b = 'tcp://*:5561'
dealer_url = 'tcp://localhost:5561'
def broker():
context = zmq.Context()
router = context.socket(zmq.ROUTER)
router.bind(router_url_b)
dealer = context.socket(zmq.DEALER)
dealer.bind(dealer_url_b)
poll = zmq.Poller()
poll.register(router, zmq.POLLIN)
poll.register(dealer, zmq.POLLIN)
while True:
poll_result = dict(poll.poll())
if router in poll_result:
ident, msg = router.recv_multipart()
print 'router: ident=%s, msg=%s' % (ident, msg)
# print 'router received "%s" and ident %s' % (msg, ident)
dealer.send_multipart([ident, msg])
# dealer.send(msg)
if dealer in poll_result:
ident, msg = dealer.recv_multipart()
print 'dealer: ident=%s, msg=%s' % (ident, msg)
router.send_multipart([ident, msg])
def client(client_id):
context = zmq.Context()
req = context.socket(zmq.DEALER)
# setting identity doesn't seem to make a difference
req.setsockopt(zmq.IDENTITY, b"%s" % client_id)
req.connect(router_url)
req.send('work %d' % client_id)
while True:
msg = req.recv()
print 'client %d received response: %s' % (client_id, msg)
def worker(worker_id):
context = zmq.Context()
# to allow asynchronous sending of responses.
rep = context.socket(zmq.ROUTER)
# not sure if this is required...
# rep.setsockopt(zmq.IDENTITY, b"%s" % (10+worker_id))
rep.connect(dealer_url)
while True:
msg = rep.recv_multipart()
ident, msg = msg[:-1], msg[-1]
print 'worker %d received: "%s", ident="%s"' % (worker_id, msg, ident)
# do some work...
time.sleep(10)
rep.send_multipart(ident + ['result A from worker %d (%s)' % (worker_id, msg)])
# do more work...
time.sleep(10)
rep.send_multipart(ident + ['result B from worker %d (%s)' % (worker_id, msg)])
print 'finished worker', worker_id
def main():
print 'creating workers'
for i in xrange(2):
p = multiprocessing.Process(target=worker, args=(i, ))
p.daemon = True
p.start()
print 'creating clients'
for i in xrange(5):
p = multiprocessing.Process(target=client, args=(i, ))
p.daemon = True
p.start()
broker()
if __name__ == '__main__':
main()
它似乎工作得很好。唯一缺少的是一旦工人开始处理工作,从客户端到工人的通信。我想最好的办法是创建某种新的控制通道(发布/订阅),以便在需要时终止工作人员。
还有几个问题:
- 这个模型有什么明显的弱点吗?
-
IDENTITY有什么用处?我是否设置这些值似乎并不重要(无论是在客户端还是在工作人员中)。 - 工人收到的第一条消息是:
worker 1 received: "work 3", ident="['\x00\x80\x00A\xa7', '3']" worker 0 received: "work 4", ident="['\x00\x80\x00A\xa7', '4']"为什么两个工人的第一个ident项目相同?我理解路由器工作的方式是分配它跟踪的随机身份。这是如何工作的(它似乎在一个小规模的例子中工作)?
【问题讨论】:
-
你好 orange -- 两个问题:Q1: 你有没有花几天时间在 Pieter HINTJENS 的优秀而有益的书籍 Code Connected ,第 1 卷(不是关于代码 sn-ps,而是关于关于非阻塞、可扩展的正式通信模式的概念性思考——确实值得花费更长的时间,而不是几天)? Q2: 你确定你在 .connect()-s 之后的 .bind() 场景符合 Pieter 的设置建议吗?预期的消息传递环境?让我们更新,谢谢。