【问题标题】:How to connect queues to a ZeroMQ PUB/SUB如何将队列连接到 ZeroMQ PUB/SUB
【发布时间】:2016-06-25 22:57:39
【问题描述】:

考虑以下几点:

  • 一组 3 个逻辑服务:S1S2S3
  • 每个服务的两个实例都在运行,所以我们有以下进程:S1P1S1P2S2P1S2P2S3P1S3P2
  • ZeroMQ 代理在单个进程中运行,所有服务进程均可访问

一个逻辑服务,比如S1,发布一条消息M1,逻辑服务S2S3感兴趣。每个逻辑服务只有一个进程必须接收M1,假设S2P1S3P2

我尝试了以下方法,但没有成功:

  • 代理线程 1 正在运行 XSUB/XPUB 代理
  • 代理线程 2 正在运行 ROUTER/DEALER 代理,其中 ROUTER 连接到 XPUB 套接字并订阅了所有内容(对于逻辑 S1
  • 代理线程 3 正在运行 ROUTER/DEALER 代理,其中 ROUTER 连接到 XPUB 套接字并订阅了所有内容(对于逻辑 S2
  • 代理线程 4 正在运行一个 ROUTER/DEALER 代理,其中 ROUTER 连接到 XPUB 套接字并订阅了所有内容(对于逻辑 S3
  • 每个逻辑服务进程都在运行一个连接到代理DEALER 套接字的REP 套接字线程

我认为XSUB/XPUB 代理会给我发布/订阅语义,而ROUTER/DEALER 代理会在REP 套接字之间为XSUB/XPUB 代理发送的消息引入竞争。

如何结合ZeroMQ 套接字来完成此任务?

更新1

我知道“没有成功”没有帮助,我尝试了不同的配置并得到了不同的错误。我尝试的最新配置如下:

(XSUB proxy=> XPUB) => (SUB copyLoop=> REQ) => (ROUTER proxy=> DEALER) => REP

copyLoop 是这样的:

public void start() {
    context = ZMQ.context(1);

    subSocket = context.socket(ZMQ.SUB);
    subSocket.connect(subSocketUrl);
    subSocket.subscribe("".getBytes());

    reqSocket = context.socket(ZMQ.REQ);
    reqSocket.connect(reqSocketUrl);

    while (!Thread.currentThread().isInterrupted()) {
        final Message msg = receiveNextMessage();
        resendMessage(msg);
    }
}

private Message receiveNextMessage() {
    final String header = subSocket.recvStr();
    final String entity = subSocket.recvStr();

    return new Message(header, entity);
}

private void resendMessage(Message msg) {
    reqSocket.sendMore(msg.getKey());
    reqSocket.send(msg.getData(), 0);
}

我得到的异常如下:

java.lang.IllegalStateException: Cannot send another request
    at zmq.Req.xsend(Req.java:51) ~[jeromq-0.3.4.jar:na]
    at zmq.SocketBase.send(SocketBase.java:613) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1206) ~[jeromq-0.3.4.jar:na]
    at org.zeromq.ZMQ$Socket.sendMore(ZMQ.java:1189) ~[jeromq-0.3.4.jar:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.resendMessage(SubReqProxyConnector.java:47) ~[classes/:na]
    at com.xyz.messaging.zeromq.SubReqProxyConnector.start(SubReqProxyConnector.java:35) ~[classes/:na]

我正在运行 JeroMQ 0.3.4、Oracle Java 8 JVM 和 Windows 7。

【问题讨论】:

  • 您可能想知道 ZeroMQ 是无代理的设计
  • 当你说“没有成功”时——你遇到了什么问题?我会在答案中建议一个替代(但类似)架构,但根据您遇到问题的位置,它可能没有帮助。
  • @Jason 你说的不太清楚是绝对正确的——这是因为我尝试了不同的组合并得到了不同的例外。我已更新问题以反映最新配置。
  • @user3666197 是的,我知道这一点,我正在尝试确定是否可以使用 ZeroMQ 构建一个准系统代理,以避免在客户的环境中安装像 RabbitMQ 这样的新组件。
  • 我不会称 ZMQ 无代理,我会说 ZMQ 是一个比代理低级别的消息传递库。您可以根据消息传递要求设计拓扑,如果需要代理,则构建一个,如果不需要,则不需要。

标签: zeromq jeromq


【解决方案1】:

您的ROUTER 连接似乎增加了一些复杂性 - 您应该能够执行直接连接到您的发布者的所有操作。

您当前遇到的错误是REQ 套接字具有严格的消息排序模式-您不允许send() 连续两次,您必须发送/接收/发送/接收/等(同样,REP 套接字必须接收/发送/接收/发送/等)。从它的外观来看,您只是在REQ 套接字上执行发送/发送/发送/等操作,而从未收到任何响应。如果您不关心同伴的响应,那么您必须接收并丢弃它或使用DEALER(或ROUTER,但DEALER 在您当前的图表中更有意义)。

我已经创建了一个图表,说明我将如何在下面完成此架构 - 使用您的基本流程结构。

Broker T1         Broker T2                Broker T3                Broker T4
(PUB*)------>(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)   -->(*SUB)[--](DEALER*)
       |_____________________||____|                  ||    |                  ||
       |_____________________||_______________________||____|                  ||
                             ||                       ||                       ||
     ========================||     ==================||            ===========||=
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
   ||             ||              ||              ||              ||              ||
(REP*)         (REP*)          (REP*)          (REP*)          (REP*)          (REP*)
 S1P1           S1P2            S2P1            S2P2            S3P1            S3P2

所以,主要区别在于我放弃了您的 (SUB copyLoop=> REQ) 步骤。是选择 XPUB/XSUB 还是 PUB/SUB 取决于您,但除非您目前想使用 XPUB/XSUB 的额外功能,否则我倾向于开始更简单。

显然,此图不涉及信息如何进入您的代理,您当前在该处显示一个 XSUB 套接字 - 这超出了您迄今为止提供的信息的范围,大概您能够接收信息到你的经纪人已经成功了,所以我不会处理。

我假设您专用于每个服务的代理线程正在明智地选择是否将消息发送到他们的服务?如果是这样,那么您选择让他们订阅所有内容应该可以正常工作,否则可能需要更智能的订阅设置。

如果您在服务进程上使用REP 套接字,则服务进程必须接收该消息并异步处理它,从不将有关该消息的任何详细信息回传给代理.然后,它必须以确认(如“RECEIVED”)响应每条消息,以便遵循REP 套接字的严格接收/发送/接收/发送模式。

如果您想要关于服务如何处理发送回代理的消息的任何其他类型的通信,REP 不再是适合您的服务进程的套接字类型,DEALER 可能不再是正确的套接字为您的经纪人键入。如果您想要某种形式的负载平衡以便发送到下一个打开的服务进程,您需要使用ROUTER/REQ 并让每个服务指示其可用性并让代理保留消息直到下一个服务进程说它可以通过发回结果来获得。如果您需要某种其他类型的消息处理,则必须指出这是什么,以便可以提出合适的架构。

【讨论】:

  • 非常感谢您的详细回答,我显然不明白 REQ 在发送另一条消息之前等待回复的事实。我已经实现了图表,虽然不再有错误,但消息没有到达目的地。 SUB/DEALER 代理订阅了所有内容。您能否确认 SUB/PUB 和 SUB/DEALER 代理都在运行 ZMQ.proxy?
  • 使用 ZMQ 代理可能是首选方法。你知道消息卡在哪里吗?能走多远?
  • 我不知道,追踪它的最佳方法是什么?
  • ZMQ.proxy() 有一个捕获套接字参数,我试试用这个。
  • 查看您的测试程序后,您似乎可能成为a slow joiner的受害者 - 您的“事件发布者”线程连接发布者,然后立即 发送一条消息 - 在您尝试发送第一条消息时,订阅者可能尚未完成订阅设置。 PUB 套接字不等待对等方完成连接,如果没有订阅者,它们只会丢弃消息。我建议您在发送消息之前稍等片刻,或者循环发送一堆消息。
【解决方案2】:

显然我把一些元素搞混了:

  • 无论您将其用作客户端套接字 (Socket.connect) 还是服务器端套接字 (Socket.bind),套接字都具有相同的 API
  • 无论类型如何,套接字都具有相同的 API(例如,不应在 PUSH 套接字上调用 Socket.subscribe
  • 某些套接字类型需要发送/接收响应循环(例如REQ/REP
  • 沟通模式中的一些细微差别(PUSH/PULLROUTER/DEALER
  • 调试 ZeroMQ 设置的困难(不可能?)

非常感谢 Jason 非常详细的回答(以及很棒的图表!),为我指明了正确的方向。

我最终得到了以下设计:

  • 代理线程 1 正在 bind(localhost:6000)bind(localhost:6001) 上运行扇出 XSUB/XPUB 代理
  • 代理线程 2 在connect(localhost:6001)bind(localhost:6002) 上运行一个队列SUB/PUSH 代理;代理线程 3 和 4 使用类似的设计,但绑定端口号不同
  • 消息生产者使用connect(localhost:6000) 上的PUB 套接字连接到代理
  • 消息使用者使用connect(localhost:6002) 上的PULL 套接字连接到代理队列代理

在这个特定于服务的队列机制之上,我能够添加一个类似的特定于服务的扇出机制,而相当简单:

  • 代理线程在connect(localhost:6001)bind(localhost:6003) 上运行SUB/PUB 代理
  • 消息生产者仍然使用connect(localhost:6000) 上的PUB 套接字连接到代理
  • 消息消费者使用connect(localhost:6003) 上的SUB 套接字连接到代理扇出代理

这是一次有趣的旅程。

【讨论】:

    猜你喜欢
    • 2014-11-24
    • 1970-01-01
    • 1970-01-01
    • 2020-04-03
    • 2019-07-23
    • 2015-04-21
    • 2013-06-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多