【问题标题】:inter thread comunication using ZeroMQ messages [closed]使用 ZeroMQ 消息的线程间通信 [关闭]
【发布时间】:2011-04-06 23:08:18
【问题描述】:

我正在尝试使用zeroMQ 作为在多个线程之间实现消息传递系统的一种方式。我尝试了下面的代码,但它不起作用;具体来说,每个线程中对zmq_recv 的调用不会等待/阻止任何消息的执行。

你能帮我看看这段代码吗?

我正在使用 Linux 操作系统和 gcc

最好的问候

AFG

    static void *
    worker_routine (void *context) {
        // Socket to talk to dispatcher
        void *receiver = zmq_socket (context, ZMQ_REP);
        zmq_connect (receiver, "inproc://workers");
        while (1) {

            zmq_msg_t request;
            zmq_msg_init( &request );
            zmq_recv( receiver, &request, 0 );
            printf ("Received request\n");
            // Do some 'work'
            usleep (1000);
            // Send reply back to client
            zmq_send (receiver, &request, 0);
        }
        zmq_close (receiver);
        return NULL;
    }

    int main (void) {

    void *context = zmq_init (1);
    void *clients = zmq_socket (context, ZMQ_REP);
    zmq_bind (clients, "inproc://workers");

    int thread_nbr;
    for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, context);
    }

    zmq_close (clients);
    zmq_term (context);
    return 0;
    }

【问题讨论】:

  • 我再次阅读 ZeroMQ 指南。有谁知道我是否必须为我的目的创建一个像 QUEUE 这样的 zmq_device ?我还注意到有使用“ipc”作为协议的示例。我一直认为对于 MT 我必须使用“inproc”。任何人都知道这是否会产生影响?

标签: c zeromq


【解决方案1】:

两个套接字都是 REP。你想要的是 REQ + REP。

【讨论】:

    【解决方案2】:

    在创建线程后,您将立即关闭套接字和 ZeroMQ。他们可能没有时间达到阻塞状态,如果他们这样做了,一旦你破坏 zmq 上下文,它们就会失败。来自zmq_term man page

    上下文终止按以下步骤执行:

    当前在上下文中打开的套接字上正在进行的任何阻塞操作都应立即返回,并带有 ETERM 错误代码。

    【讨论】:

    • 我可能需要添加也尝试添加一点“睡眠”;无论如何,我仍然不知道我所期望的行为是否符合该设置..请参阅问题下方的我的 cmets。 PS。感谢您的帮助!
    【解决方案3】:

    首先,正如@sustrik 所说,您需要使用REQREP,主线程和工作线程都不能是REP

    其次,您需要在主线程中提供某种阻塞循环:

    int main (int argc, char **argv)
    {
        void *context = zmq_init (1);
        void *clients = zmq_socket (context, ZMQ_REP); // use ZMQ_REQ on the clients
        zmq_bind (clients, "inproc://workers");
    
        int thread_nbr;
        for (thread_nbr = 0; thread_nbr < 5; thread_nbr++) {
            pthread_t worker;
            pthread_create (&worker, NULL, worker_routine, context);
        }
    
        while (TRUE)
        {
            // worker thread connected asking for work
            zmq_msg_t request;
            zmq_msg_init (&request);
            zmq_recv (clients, &request, 0);
            zmq_msg_close (&request);
    
            // do whatever you need to do with the clients' request here
    
            // send work to clients
            zmq_msg_t reply;
            zmq_msg_init_data (&reply, "Reply", 5, NULL, NULL);
            zmq_send (clients, &reply, 0);
            zmq_msg_close (&reply);
        }
    
        zmq_close (clients);
        zmq_term (context);
        return 0;
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-08-28
      • 1970-01-01
      • 2020-09-18
      • 2014-04-06
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多