【问题标题】:pyzmq socket.recv() with NOBLOCK flag behaviour具有 NOBLOCK 标志行为的 pyzmq socket.recv()
【发布时间】:2019-08-16 18:19:21
【问题描述】:

我使用 pyzmq 创建了简单的客户端/服务器。

我不确定的一件事是.recv() 没有收到消息,即使它是从服务器发送的。它只是忽略它并抛出一个我觉得很奇怪的错误。

Client.py

try:
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:2222")
    print("Sending request")
    socket.send(b"send the message")
    message = socket.recv(flags=zmq.NOBLOCK)
    print("Received reply %s " % message)
except Exception as e:
    print(str(e))

Server.py

 context = zmq.Context()
 socket = context.socket(zmq.REP)
 socket.bind("tcp://*:2222")
 while True:
     message = socket.recv()
     socket.send(b"Ack")

我认为客户端应该收到 Ack 并打印它而不是抛出异常。

文件说,

使用flags=NOBLOCK,如果没有消息到达,则会引发ZMQError

很明显,服务器在收到消息后会立即响应“Ack”。

错误信息是,

资源暂时不可用

【问题讨论】:

    标签: python zeromq pyzmq


    【解决方案1】:

    请记住,在并发环境中,不能保证独立进程的执行顺序。即使您立即响应server.py 中的消息,在您调用socket.recv 之前响应可能不会到达接收套接字。当您调用socket.send 时,消息需要通过网络发送到您的服务器,服务器需要创建消息并做出响应,然后消息需要通过网络返回到您的客户端代码。通过网络发送消息的时间会很长,您在socket.send 之后立即调用socket.recv

    所以事实上,当您调用message = socket.recv(flags=zmq.NOBLOCK) 时,客户端socket 还没有收到来自服务器的Ack,并且由于您使用的是NOBLOCK,因此会抛出一个错误,因为没有收到任何消息socket.

    NOBLOCK 在这种情况下可能不合适。您可以通过在sendrecv 之间添加sleep 调用来对此进行试验,以表明等待服务器响应的时间延迟确实是问题所在,但这对于您的客户端代码来说不是一个好的解决方案。 -术语。

    如果你想在等待一段时间后退出,你应该改用socket.poll

    event = socket.poll(timeout=3000)  # wait 3 seconds
    if event == 0:
        # timeout reached before any events were queued
        pass
    else:
        # events queued within our time limit
        msg = socket.recv()
    

    Pyzmq Doc for socket.poll()

    【讨论】:

    • 那么在什么情况下这个“使用标志=NOBLOCK,如果没有消息到达,这会引发 ZMQError”适用。实时网络中总是会有一些延迟,所以我不太确定这个标志有什么用
    • 如果您想提供超时,请查看我关于 usingpoll 的更新
    • @azundo 恕我直言,您是如何得出一个结论,即调用socket.recv( zmq.NOBLOCK ) 方法“在这种情况下可能不合适” ? 在使用基于 ZeroMQ 的分布式计算 10 多年后,我第一次听到这样的意见。如上所述,您对这种观点的论据是什么?您是否重新测试了 MCVE 并亲自复制了 O/P 发布的所有声称的异常?是什么促使您发表意见,以及应采取哪些步骤(如果有)来实现此场景的适当解决方案?深感惊讶
    【解决方案2】:

    Q说服务器没有启动,在这种情况下,客户端中的recv() 将永远被阻止,这是我不想要的。

    ZeroMQ 是一个出色的框架,用于在分布式系统中进行智能信令/消息传递

    让我们绘制一个主要非阻塞操作方式的演示,其中包含一些关于如何在进程终止之前获取和优雅释放资源的灵感。

    也许阅读一下ZeroMQ hierarchy in less than a five seconds 中的主要概念差异也会有所帮助。

    Server.py

     aContext     = zmq.Context()
     aLightHouse  =    aContext.socket( zmq.PUB )
     aRepSocket   =    aContext.socket( zmq.REP )
     aRepSocket.setsockopt(             zmq.LINGER,   0 )
     aRepSocket.setsockopt(             zmq.COMPLETE, 1 )
     aRepSocket.bind(                  "tcp://*:2222" )
     aLightHouse.bind(                 "tcp://*:3333" )
     aLightHouse.setsockopt(            zmq.LINGER,   0 )
     aLightHouse.setsockopt(            zmq.CONFLATE, 1 )
     aLightHouse_counter = 0
     #------------------------------------------------------------
     print( "INF: Server InS: ZeroMQ({0:}) going RTO:".format( zmq.zmq_version() )  )
     #------------------------------------------------------------
     while True:
        try:
            aLightHouse_counter += 1
            aLightHouse.send( "INF: server-RTO blink {0:}".format( repr( aLightHouse_counter ) ),
                               zmq.NOBLOCK
                               )
            if ( 0 < aRepSocket.poll( 0, zmq.POLLIN ) ):
                try:
                    message = aRepSocket.recv(         zmq.NOBLOCK ); print( "INF: .recv()ed {0:}".format( message ) )
                    pass;     aRepSocket.send( b"Ack", zmq.NOBLOCK ); print( "INF: .sent() ACK" )
                except:
                    # handle EXC: based on ...
                    print(  "EXC: reported as Errno == {0:}".format( zmq.zmq_errno() ) )
            else:
                # NOP / Sleep / do other system work-units to get processed during the infinite-loop
        except:
            # handle EXC:
            print(  "EXC: will break ... and terminate OoS ..." )
            break
    #------------------------------------------------------------
    print( "INF: will soft-SIG Server going-OoS..." )
    aLightHouse.send(   "INF: server goes OoS ... " )
    #------------------------------------------------------------
    print( "INF: will .close() and .term() resources on clean & graceful exit..." )
    Sleep( 0.987654321 )
    aRepSocket.unbind(  "tcp://*:2222" )
    aRepSocket.close()
    aLightHouse.unbind( "tcp://*:3333" )
    aLightHouse.close()
    aContext.term()
    #------------------------------------------------------------
    print( "INF: over and out" )
    

    Client.py

    try:
      aContext   = zmq.Context()
      aReqSocket =    aContext.socket( zmq.REQ )
      aBeeper    =    aContext.socket( zmq.SUB )
      aReqSocket.setsockopt(           zmq.LINGER,   0 )
      aReqSocket.setsockopt(           zmq.COMPLETE, 1 )
      aReqSocket.connect(             "tcp://localhost:2222" )
      aBeeper.connect(                "tcp://localhost:3333" )
      aBeeper.setsockopt(              zmq.SUBSCRIBE, "" )
      aBeeper.setsockopt(              zmq.CONFLATE, 1 )
      #------------------------------------------------------------
      print( "INF: Client InS: ZeroMQ({0:}) going RTO.".format( zmq.zmq_version() )  )
      #------------------------------------------------------------
      try:
          while True:
               if ( 0 == aBeeper.poll( 1234 ) ):
                    print( "INF: Server OoS or no beep visible within a LoS for the last 1234 [ms] ... " )
               else:
                    print( "INF: Server InS-beep[{0:}]".format( aBeeper.recv( zmq.NOBLOCK ) ) )
                    try:
                         print( "INF: Going to sending a request" )
                         aReqSocket.send( b"send the message", zmq.NOBLOCK )
                         print( "INF: Sent. Going to poll for a response to arrive..." )
                         while ( 0 == aReqSocket.poll( 123, zmq.POLLIN ) ):
                               print( "INF:  .poll( 123 ) = 0, will wait longer ... " )
                         message = socket.recv( flags = zmq.NOBLOCK )
                         print( "INF: Received a reply %s " % message )
                    
                     except Exception as e:
                         print( "EXC: {0:}".format( str( e ) ) )
                         print( "INF: ZeroMQ Errno == {0:}".format( zmq.zmq_errno() ) )
                         print( "INF: will break and terminate" )
                         break
      except Exception as e:
          print( "EXC: {0:}".format( str( e ) ) )
      finally:
          #------------------------------------------------------------
          print( "INF: will .close() and .term() resources on clean & graceful exit..." )
          aBeeper.close()
          aReqSocket.close()
          aContext.term()
          #------------------------------------------------------------
          print( "INF: over and out" )
    

    【讨论】:

      【解决方案3】:

      您正在使用非阻塞模式,这意味着它会引发错误来通知您,该消息无法处理任何事情,您应该稍后再试,但如果您使用阻塞模式,它会阻塞直到对等方连接。

      这个答案来自here

      基本上,如果您删除flags=zmq.NOBLOCK,它将起作用。

      更新

      如果你想使用非阻塞模式,你应该看看this

      【讨论】:

      • 是的,如果我删除 flags=zmq.NOBLOCK,只要服务器启动并运行,它就会工作。说在这种情况下服务器没有启动,客户端中的 recv() 将永远被阻止,这是我不想要的。但同时我想看看它是否收到
      • 我不确定你到底想要什么,但如果你使用阻塞模式,你可以在发送消息之前检查服务器脚本是否正在运行,以避免永远阻塞消息。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-01-10
      • 1970-01-01
      • 1970-01-01
      • 2017-03-06
      • 1970-01-01
      相关资源
      最近更新 更多