Q:说服务器没有启动,在这种情况下,客户端中的recv() 将永远被阻止,这是我不想要的。
ZeroMQ 是一个出色的框架,用于在分布式系统中进行智能信令/消息传递
让我们绘制一个主要非阻塞操作方式的演示,其中包含一些关于如何在进程终止之前获取和优雅释放资源的灵感。
也许阅读一下ZeroMQ hierarchy in less than a five seconds 中的主要概念差异也会有所帮助。
Server.py
aContext = zmq.Context()
aLightHouse = aContext.socket( zmq.PUB )
aRepSocket = aContext.socket( zmq.REP )
aRepSocket.setsockopt( zmq.LINGER, 0 )
aRepSocket.setsockopt( zmq.COMPLETE, 1 )
aRepSocket.bind( "tcp://*:2222" )
aLightHouse.bind( "tcp://*:3333" )
aLightHouse.setsockopt( zmq.LINGER, 0 )
aLightHouse.setsockopt( zmq.CONFLATE, 1 )
aLightHouse_counter = 0
#------------------------------------------------------------
print( "INF: Server InS: ZeroMQ({0:}) going RTO:".format( zmq.zmq_version() ) )
#------------------------------------------------------------
while True:
try:
aLightHouse_counter += 1
aLightHouse.send( "INF: server-RTO blink {0:}".format( repr( aLightHouse_counter ) ),
zmq.NOBLOCK
)
if ( 0 < aRepSocket.poll( 0, zmq.POLLIN ) ):
try:
message = aRepSocket.recv( zmq.NOBLOCK ); print( "INF: .recv()ed {0:}".format( message ) )
pass; aRepSocket.send( b"Ack", zmq.NOBLOCK ); print( "INF: .sent() ACK" )
except:
# handle EXC: based on ...
print( "EXC: reported as Errno == {0:}".format( zmq.zmq_errno() ) )
else:
# NOP / Sleep / do other system work-units to get processed during the infinite-loop
except:
# handle EXC:
print( "EXC: will break ... and terminate OoS ..." )
break
#------------------------------------------------------------
print( "INF: will soft-SIG Server going-OoS..." )
aLightHouse.send( "INF: server goes OoS ... " )
#------------------------------------------------------------
print( "INF: will .close() and .term() resources on clean & graceful exit..." )
Sleep( 0.987654321 )
aRepSocket.unbind( "tcp://*:2222" )
aRepSocket.close()
aLightHouse.unbind( "tcp://*:3333" )
aLightHouse.close()
aContext.term()
#------------------------------------------------------------
print( "INF: over and out" )
Client.py
try:
aContext = zmq.Context()
aReqSocket = aContext.socket( zmq.REQ )
aBeeper = aContext.socket( zmq.SUB )
aReqSocket.setsockopt( zmq.LINGER, 0 )
aReqSocket.setsockopt( zmq.COMPLETE, 1 )
aReqSocket.connect( "tcp://localhost:2222" )
aBeeper.connect( "tcp://localhost:3333" )
aBeeper.setsockopt( zmq.SUBSCRIBE, "" )
aBeeper.setsockopt( zmq.CONFLATE, 1 )
#------------------------------------------------------------
print( "INF: Client InS: ZeroMQ({0:}) going RTO.".format( zmq.zmq_version() ) )
#------------------------------------------------------------
try:
while True:
if ( 0 == aBeeper.poll( 1234 ) ):
print( "INF: Server OoS or no beep visible within a LoS for the last 1234 [ms] ... " )
else:
print( "INF: Server InS-beep[{0:}]".format( aBeeper.recv( zmq.NOBLOCK ) ) )
try:
print( "INF: Going to sending a request" )
aReqSocket.send( b"send the message", zmq.NOBLOCK )
print( "INF: Sent. Going to poll for a response to arrive..." )
while ( 0 == aReqSocket.poll( 123, zmq.POLLIN ) ):
print( "INF: .poll( 123 ) = 0, will wait longer ... " )
message = socket.recv( flags = zmq.NOBLOCK )
print( "INF: Received a reply %s " % message )
except Exception as e:
print( "EXC: {0:}".format( str( e ) ) )
print( "INF: ZeroMQ Errno == {0:}".format( zmq.zmq_errno() ) )
print( "INF: will break and terminate" )
break
except Exception as e:
print( "EXC: {0:}".format( str( e ) ) )
finally:
#------------------------------------------------------------
print( "INF: will .close() and .term() resources on clean & graceful exit..." )
aBeeper.close()
aReqSocket.close()
aContext.term()
#------------------------------------------------------------
print( "INF: over and out" )