【问题标题】:Why the client cann't receive message from server with NetMQ framework?为什么客户端无法使用 NetMQ 框架从服务器接收消息?
【发布时间】:2023-04-09 23:30:02
【问题描述】:

最近,我使用 NetMQ 在服务器和客户端之间发送或接收消息。 服务器代码如:

    void Main()
    {
      CreatePullAndPushSocket();
      Task.Factory.StartNew(()=> {
            while (true)
            {
                Thread.Sleep(1);
                if (Pull != null)
                {
                    var message = Pull.ReceiveFrameString();
                }
            }
        });
    }
    PullSocket Pull;
    PushSocket Push;
    private void CreatePullAndPushSocket()
    {
        Pull = new PullSocket("tcp://ip1:port1");
        Push = new PushSocket("tcp://ip2:port2");
    }
    public void SendMessageToClient(string message)
    {
        if (Push != null)
        {
            Push.SendFrame(message);
        }
    }

客户端代码如下:

   void Main()
    { 
      new Thread(()=> {
            while (true)
            {
                Thread.Sleep(1);
                if (Pull != null)
                {
                    var message = Pull.ReceiveFrameString();
                }
            }
        }).Start();
    }
    PullSocket Pull;
    PushSocket Push;
    private void CreatePullAndPushSocket()
    {
        Pull = new PullSocket("tcp://ip2:port2");
        Push = new PushSocket("tcp://ip1:port1");
    }
    public void SendMessageToClient(string message)
    {
        if (Push != null)
        {
            Push.SendFrame(message);
        }
    }

当我运行两个应用程序时,一个是服务器应用程序,另一个是客户端应用程序。

  • 1:Client向Server发送消息
  • 2:服务器可以接收到客户端的消息
  • 3:服务器向客户端发送另一条消息
  • 4:客户端收不到消息!!!

好奇怪,我已经按照https://netmq.readthedocs.io/en/latest/push-pull/的指导!

【问题讨论】:

    标签: c# message-queue zeromq netmq


    【解决方案1】:

    在 NetMQ 中非常重要的是线程模型。真正重要的是,您不应该在多个线程中使用套接字。因此,如果 Thread#1 创建了套接字,那么它应该使用它。如果您想通过使用相同的套接字从其他线程(比如说线程#2)发送消息,请忘记这一点。您应该以某种方式将消息从 Thread#2 发送到 Thread#1,然后它应该通过套接字将其发送到客户端。

    所以基本上 CreatePullAndPushSocket 是错误的,可能会发生奇怪的事情。您正在一个线程中创建套接字并在另一个线程中使用。完全错了。

    另一件事是你的 Thread.Sleep。您不应该使用 Thread.Sleep,因为您的线程正在睡眠 1 秒,然后检查一次套接字,而不是睡眠并检查一次。 NetMQ 有函数 TryReceive,有超时。因此它可以检查套接字 1 秒并退出以检查您是否调用了取消/停止或其他方式。或者更好的是有一个轮询器,它会一直监听套接字,并允许我们从其他线程调用停止。

    让我们看看这段代码:

    private Poller _poller;
    
    public void Start()
    {
      Task.Factory.StartNew(()=> {
         using(var pullSocket = new PullSocket("tcp://ip1:port1"))
         using(_poller = new Poller())
         {
            pullSocket.ReceiveReady += (object sender, NetMQSocketEventArgsnetMqSocketEventArgs) =>
            {
                var message = netMqSocketEventArgs.Socket.ReceiveMultipartMessage();
            }
            _poller.Add(pullSocket);
            _poller.Run();
         }
        });
    }
    public void Stop()
    {
       _poller?.Stop();
    }
    

    或者如果您想在 while 循环中使用不带轮询器的代码:

    私有只读 CancellationTokenSource _cts;

    public void Start()
    {
       _cts = new CancellationTokenSource();
       var token = _cts.Token;
    
      Task.Factory.StartNew(()=> {
         using(var pullSocket = new PullSocket("tcp://ip1:port1"))
         {
            while (cancellationToken.IsCancellationRequested == false)
            {
                NetMQMessage message = new NetMQMessage();
                bool success = workerSocket.TryReceiveMultipartMessage(TimeSpan.FromSeconds(5), ref message);
    
                if (success == false)
                {
                    continue;
                }
    
                //if You reach this line, than You have a message
            }
         }
        },
        token,
        TaskCreationOptions.LongRunning,
        TaskScheduler.Default);
    }
    public void Stop()
    {
        _cts.Cancel();
        _cts.Token.WaitHandle.WaitOne();//if You want wait until service will stop
    }
    

    所以回到你的问题,你应该只在你创建它的线程内部使用套接字。好的是在 using 语句中总是使用 socket,最后总是释放它。

    我看不到 SendMessageToClient 方法的用法,但我认为您是从某个按钮或其他东西调用它。如果从该线程调用了套接字的构造函数,则可以这样做。如果你能告诉我你在哪里调用这个方法,我可以谈谈这个。

    【讨论】:

      猜你喜欢
      • 2012-12-03
      • 1970-01-01
      • 2019-04-01
      • 2013-03-28
      • 1970-01-01
      • 2012-04-07
      • 1970-01-01
      • 1970-01-01
      • 2015-02-04
      相关资源
      最近更新 更多