【问题标题】:ZMQ causes main thread to freeze (or something similar..?)ZMQ 导致主线程冻结(或类似的东西..?)
【发布时间】:2018-03-07 14:34:24
【问题描述】:

我有以下一段 c++ 代码,它打开一个 ZMQ-subscriber-socket 并在无限循环中接收消息。

listener.cc:(代码应该可以工作,编译:g++ -lzmq listener.cc

#include <iostream>
#include <zmq.hpp>

class Listener {
public:
    Listener() {
        std::cout << "constructor call" << std::endl;

        // Not working:
//        zmq::context_t context(1);
//        sck = new zmq::socket_t(context, ZMQ_SUB);
//        sck->connect("tcp://127.0.0.1:9999");
//        sck->setsockopt( ZMQ_SUBSCRIBE, "", 0);

        std::cout << "constructor end" << std::endl;
    }

    void run() {
        // Ok:
        zmq::context_t context(1);
        sck = new zmq::socket_t(context, ZMQ_SUB);
        sck->connect("tcp://127.0.0.1:9999");
        sck->setsockopt(ZMQ_SUBSCRIBE, "", 0);

        while (1) { // Receive messages, not interesting:
            std::cout << "listening..." << std::endl;
            zmq::message_t message;
            sck->recv(&message);
            std::cout << "received something" << std::endl;
        }
    }
    zmq::socket_t *sck;
};

int main(int argc, char *argv[]) {
    Listener listener;
    std::cout << "foo" << std::endl;
    listener.run();
    return 0;
}

到目前为止,代码按预期工作:

$ g++ -lzmq listener.cc
$ ./a.out 
constructor call
constructor end
foo
listening...

但是,我想将 zmq-context/socket 的初始化移动到类的构造函数中(被注释掉的部分)。但是代码根本不会从构造函数调用中返回,构造函数中的所有语句都被执行,但是main 的第二行没有被执行,程序被卡住了。输出是:

$ g++ -lzmq listener.cc
$ ./a.out 
constructor call
constructor end

我唯一想到的是主线程由于某种原因停止执行。谁能解释一下并提供解决方案?

干杯

【问题讨论】:

    标签: c++ multithreading zeromq


    【解决方案1】:

    谁能解释这一点并提供解决方案?两者都是...

    ZeroMQ 使用每个Context( nIOthreads = 1 ) 实例作为一个非常强大的引擎,并且必须小心谨慎,以免资源管理程序感到惊讶(因为阻塞/冻结就是这样一种情况)。

    如果有一些积极使用的套接字实例(在 Context()-instance 的后台管理),可能存在这样一种情况,即在进入析构函数处理阶段之前并非所有传输都已完成,或者如果手动执行类似步骤,则尝试.close() 这样的套接字实例和/或.term() 上下文实例。

    人们已经多次陷入这种情况,无论是否有意。

    ZeroMQ 原生 API 文档在这个主题上非常清楚,并警告存在风险,即尚未完成的低级事务可能会让代码无限等待外部(远程代理操作)事件,该事件永远不会出现。这种无意识的代码看起来像一个冻结/挂起的故障,但只是由于一个人未能意识到这种风险并且没有采取适当的预防措施而陷入这种确定性的情况。

    虽然较新的 API 版本更改了一些默认设置,但我建议所有用户明确设置安全配置,即使较新的默认设置可能避免手动执行此操作的需要。然而,这种做法有助于提高人们对在适当的 设计实践中应该考虑哪些类型的碰撞的认识。


    解决方案?总是.setsockopt( ZMQ_LINGER, 0 );

    zmq_term() 将阻塞直到满足以下条件:

    在上下文中打开的所有套接字都已用zmq_close() 关闭。
    对于上下文中的每个套接字,应用程序使用zmq_send() 发送的所有消息都已物理传输到网络对等方,或者使用 ZMQ_LINGER 套接字选项设置的套接字的停留期已过期。

    如上所述,这是每个套接字实例化的经验法则。

    class Listener {
       // zmq::context_t aClassLocalCONTEXT;              // MAY GET SET LOCAL CTX BY VALUE
       // zmq::socket_t  aClassLocalSOCKET;               // MAY GET SET LOCAL SCK BY VALUE EITHER
          zmq::socket_t  *sck;
    
    public:
        Listener() {
            std::cout << "constructor call" << std::endl;
    
         // zmq::context_t context(1);                    // not a best practice here
         // ---------------------------------------------------
         // sck = new zmq::socket_t( aClassLocalCONTEXT, ZMQ_SUB );
            sck = new zmq::socket_t( context, ZMQ_SUB );
            sck->setsockopt( ZMQ_LINGER, 0 );             // ALWAYS, best before .bind()/.connect()
            sck->connect(   "tcp://127.0.0.1:9999" );
            sck->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
         // ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
         // aClassLocalSOCKET->setsockopt( ZMQ_LINGER, 0 );
         // aClassLocalSOCKET->connect(    ... );
         // aClassLocalSOCKET->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
         // ---------------------------------------------------
            std::cout << "constructor end" << std::endl;
        }
       ~Listener() {
            sck->close();                                 // A GOOD PRACTICE
         // ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
         // aClassLocalSOCKET->close();
        }
        void run() {            
            while (1) {                                   // recv()-messages, not interesting:
                std::cout << "listening..." << std::endl;
                zmq::message_t message;
                sck->recv(&message);
                std::cout << "received something" << std::endl;
                zmq::zmq_msg_close(&message);             // A GOOD PLACE TO DISCARD A NEVER MORE USED RESOURCE
            }
        }
    };
    
    
    int main(int argc, char *argv[]) {
        zmq::context_t context(1);                        // GLOBAL CTX
        Listener listener;
        std::cout << "foo" << std::endl;
        listener.run();
        return 0;
    }
    

    高效的资源可降低间接成本

    资源的智能处理很重要,因为每次实例化和销毁都会在[TIME]-domain 和[SPACE]-domain 中承担成本(内存分配/取消分配成本,同样及时)而且这些都不便宜。

    另外一个人应该遵循 ZeroMQ Zen of Zero - 不要共享任何东西(嗯,有时共享 Context()-instance 是一种方式,但是......如果你认真对待 设计,最好阅读Pieter HINTJENS 的书《Code Connected: Volume 1》,绝对值得花时间和精力)。

    【讨论】:

    • 感谢您的精彩解释(还有@Drew)。在我的应用程序中,我只需要 Listener 类中的 ZMQ 功能(此代码是精心制作的最小示例),因此全局 context 没有意义。但我设法使context 成为类成员并在构造函数中使用new 对其进行实例化——现在它可以正常工作了。
    【解决方案2】:

    从构造函数中取出zmq::context_t context(1);

    应该在全局或类似的地方初始化。通常,您只需要其中之一。

    您正在冻结,因为您试图删除构造函数本地的zmq::context_t,而仍在使用它的套接字仍然存在。

    【讨论】:

      猜你喜欢
      • 2011-05-10
      • 1970-01-01
      • 2013-02-17
      • 1970-01-01
      • 1970-01-01
      • 2018-10-06
      • 2010-09-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多