【问题标题】:ZeroMQ (cppzmq) subscriber skips first messageZeroMQ (cppzmq) 订阅者跳过第一条消息
【发布时间】:2017-08-17 16:11:19
【问题描述】:

我正在尝试将ZMQCPPZMQ C++ 包装器一起使用,因为它似乎是C++ Bindings 中建议的包装器。

客户端/服务器 (REQ/REP) 似乎工作正常。 当试图实现一对发布/订阅程序时,看起来第一条消息在订阅者中丢失了。为什么?

publisher.cpp:

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/thread.hpp>
#include <boost/format.hpp>
#include <zmq.hpp>
#include <string>
#include <iostream>

int main()
{
    zmq::context_t context(1);
    zmq::socket_t publisher(context, ZMQ_PUB);
    publisher.bind("tcp://*:5555");

    for(int n = 0; n < 3; n++) {
        zmq::message_t env1(1);
        memcpy(env1.data(), "A", 1);
        std::string msg1_str = (boost::format("Hello-%i") % (n + 1)).str();
        zmq::message_t msg1(msg1_str.size());
        memcpy(msg1.data(), msg1_str.c_str(), msg1_str.size());
        std::cout << "Sending '" << msg1_str << "' on topic A" << std::endl;
        publisher.send(env1, ZMQ_SNDMORE);
        publisher.send(msg1);

        zmq::message_t env2(1);
        memcpy(env2.data(), "B", 1);
        std::string msg2_str = (boost::format("World-%i") % (n + 1)).str();
        zmq::message_t msg2(msg2_str.size());
        memcpy(msg2.data(), msg2_str.c_str(), msg2_str.size());
        std::cout << "Sending '" << msg2_str << "' on topic B" << std::endl;
        publisher.send(env2, ZMQ_SNDMORE);
        publisher.send(msg2);

        boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
    }
    return 0;
}

订阅者.cpp:

#include <zmq.hpp>
#include <string>
#include <iostream>

int main()
{
    zmq::context_t context(1);
    zmq::socket_t subscriber(context, ZMQ_SUB);
    subscriber.connect("tcp://localhost:5555");
    subscriber.setsockopt(ZMQ_SUBSCRIBE, "B", 1);

    while(true)
    {
        zmq::message_t env;
        subscriber.recv(&env);
        std::string env_str = std::string(static_cast<char*>(env.data()), env.size());
        std::cout << "Received envelope '" << env_str << "'" << std::endl;

        zmq::message_t msg;
        subscriber.recv(&msg);
        std::string msg_str = std::string(static_cast<char*>(msg.data()), msg.size());
        std::cout << "Received '" << msg_str << "'" << std::endl;
    }
    return 0;
}

程序输出:

$ ./publisher
Sending 'Hello-1' on topic A
Sending 'World-1' on topic B
Sending 'Hello-2' on topic A
Sending 'World-2' on topic B
Sending 'Hello-3' on topic A
Sending 'World-3' on topic B

$ ./subscriber
Received envelope 'B'
Received 'World-2'
Received envelope 'B'
Received 'World-3'

(注意:订阅者在执行发布者之前执行)

额外问题:顺便说一句,是我的印象还是这个 C++ 包装器的级别很低?我看不到对 std::string 的直接支持,而且传输简单字符串的代码看起来很冗长。

【问题讨论】:

    标签: c++ zeromq


    【解决方案1】:

    the ZeroMQ Guide找到答案:

    关于 PUB-SUB 套接字,还有一件事需要了解:您 不知道订阅者何时开始收到消息。甚至 如果您启动订阅者,请稍等片刻,然后启动发布者, 订阅者总是会错过发布者发送的第一条消息 发送。这是因为当订阅者连接到发布者时 (需要一小段时间但非零时间的事情),发布者可以 已经在发送消息了。

    这种“慢木工”症状经常影响到足够多的人,以至于我们 将详细解释它。记住 ZeroMQ 是异步的 I/O,即在后台。假设您有两个节点在执行此操作,在 这个顺序:

    订阅者连接到端点并接收和计算消息。 Publisher 绑定到一个端点并立即发送 1,000 条消息。 那么订阅者很可能不会收到任何东西。你会 闪烁,检查您是否设置了正确的过滤器并重试,然后 订阅者仍然不会收到任何东西。

    建立一个 TCP 连接涉及到和从握手,需要 几毫秒,具体取决于您的网络和跃点数 同行之间。在那个时候,ZeroMQ 可以发送很多消息。为了缘故 论据假设建立连接需要 5 毫秒,并且 同一个链接每秒可以处理 100 万条消息。在 5 毫秒内 订阅者正在连接到发布者,它需要 发布者只需 1 毫秒即可发送这 1K 条消息。

    Chapter 2 - Sockets and Patterns 我们将解释如何同步一个 发布者和订阅者,这样您就不会开始发布数据 直到订阅者真正连接并准备好。有一个 简单而愚蠢的延迟发布者的方法,就是睡觉。别 但是,在实际应用程序中执行此操作,因为它非常脆弱 以及不优雅和缓慢。用睡眠向自己证明什么是 发生,然后等待Chapter 2 - Sockets and Patterns 看到 如何正确地做到这一点。

    同步的替代方法是简单地假设 发布的数据流是无限的,没有开始也没有结束。一 还假设订阅者不关心之前发生的事情 它开始了。这就是我们构建天气客户端示例的方式。

    所以客户端订阅了它选择的邮政编码并收集了 100 该邮政编码的更新。这意味着大约一千万次更新 服务器,如果邮政编码是随机分布的。你可以启动 客户端,然后是服务器,客户端将继续工作。你可以 尽可能频繁地停止和重新启动服务器,客户端将 继续工作。当客户端收集了它的一百个更新时,它 计算平均值,打印出来,然后退出。

    【讨论】:

    【解决方案2】:

    奖励答案:

    ZeroMQ 专为高性能消息传递/信令而设计,因此具有一些设计准则,核心部分围绕这些准则进行开发。

    零复制和零共享是比较知名的,零(几乎)延迟可能(有点)具有挑衅性,而零保修可能是一种,你最不想听到的.

    是的,ZeroMQ 不努力提供任何明确的保证(当然,由于分布式系统领域中常见的许多原因),但但它为您提供此类保证 - 任何消息要么以原子方式传递(即完整、无错误) - 要么根本不传递(因此,与检测和丢弃任何残次和/或损坏的消息有效负载相关联,确实无需支付任何额外费用) .

    所以可能宁愿忘记担心任何未交付的数据包,以及如果这些数据已交付等等等等。您只需尽可能多地获得,其余的不受您的影响(“迟到者”案例可以被视为一个边界,其中(如果)一个人能够为“慢加入者”强制执行更多时间,那么这种可观察到的差异都不会改变代码设计,所以宁愿尝试设计分布式系统对(主要)可能的未传递信号/消息具有鲁棒性)。

    API?包装器...

    如果对这种详细程度感兴趣,建议阅读 API,因为一些 v2.x,以便更好地实现所有想法,这些想法是为了争取最大性能(零拷贝动机集)消息准备步骤、消息的高级 API 调用、将被重新发送的消息、内存泄漏预防、用于增加 IO 吞吐量/减少延迟/相对优先级等的高级 IO-thread-Pool 映射)。

    在此之后,人们可能会检查任何相应的非本地语言绑定(包装器)在跨移植编程环境中的这些初始设计工作的反映程度(或差劲程度)。

    大多数此类工作都遇到了麻烦,无法在用户编程舒适度、目标编程环境表达性约束和最大限度地减少内存泄漏或 API 绑定/包装器质量受损的问题之间找到合理的平衡。

    请注意,设计非本地语言绑定是一些最具挑战性的任务之一。因此,我们应该容忍那些决定进入这一领域的勇敢团队(有时未能在不降低性能和/或原始意图清晰的情况下反映所有原生 API 优势——不用补充,许多原生 API 功能可能甚至被排除在无法从环境中访问,无法在这种非本地语言表达范围内提供无缝集成的环境中,因此在评估 API 绑定/包装器时要小心(并且原始的本地 API 总是有助于获得ZeroMQ 原始权力的根源)-无论如何-在大多数极端情况下,可能会尝试在关键部分内联)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多