【问题标题】:pyzmq simultaneous send to multiple clientspyzmq 同时发送到多个客户端
【发布时间】:2019-09-05 02:50:26
【问题描述】:

我有一个 Raspberry Pi 客户端和 4 个 Raspberry Pi 服务器。我希望客户端同时向所有 4 个服务器发送字符串消息以捕获图像。现在我正在按顺序使用类似下面的东西。

socket.send(capture)
socket1.send(capture)
socket2.send(capture)
socket3.send(capture)

更改为发布/订阅模型会改善客户端接收消息的距离吗?我希望 4 个客户端在 5 毫秒或更短的时间内收到捕获消息。

【问题讨论】:

  • 您可以使用 PUB/SUB 模式。您的客户端将绑定到 IP:PORT 并且您的服务器将连接到客户端 IP:PORT 并为过滤器设置一个空字符串(请查看文档)然后客户端将发布一个字符串。此消息同时发送到所有订阅的服务器。一旦服务器收到消息,他们就可以开始捕获过程。

标签: python networking raspberry-pi pyzmq


【解决方案1】:

欢迎来到零之禅:

虽然我们获得零保修,但我们可能会在适当的方向上采取一些步骤。如果是 ZeroMQ 新手,请在深入了解更多细节之前随意阅读一些 posts here 和至少“ZeroMQ Principles in less than Five Seconds”:

客户代码概念模板:

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context( 4 )                                # request 4-I/O-threads
   aPUB = aCtx.socket( zmq.PUB )                          # PUB-instance
   aPUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aPUB.setsockopt(    zmq.SNDBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aPUB.setsockopt(    zmq.SNDHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aPUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aPUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-send "old"
   aPUB.bind( <transport-class>:<port#> )                 # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   # may like to set aPayLOAD = gzip.compress( dill.dumps( capture ), compressionLEVEL )
   #                 yields reduced sizes of the serialised <capture> data
   #                 at costs of about ~30~60 [ms] on either side
   #                 which may lower the network traffic and .SNDBUF-sizing issues
   #----------------------------------------------------------------------
   while <any reason>:
         try:
              aPUB.send( aPayLOAD, zmq.NOBLOCK )
         except:
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aPUB.close()
   aCtx.term()

服务器代码概念模板:

   import zmq;                 print( zmq.zmq_version() ) # INF:

   aCtx = zmq.Context()                                   # request 4-I/O-threads
   aSUB = aCtx.socket( zmq.SUB )                          # SUB-instance
   aSUB.setsockopt(    zmq.LINGER,   0 )                  # avoid deadlock on close
   aSUB.setsockopt(    zmq.RCVBUF, 3 * PayLoadSIZE )      # FullHD ~ 6 MB, 4K ~ ...
   aSUB.setsockopt(    zmq.RCVHWM, aNumOfPicsInQUEUE )    # 1, ~3? ~10?, !1000 ...
   aSUB.setsockopt(    zmq.IMMEDIATE, 1 )                 # ignore L1/L2-incomplete(s)
   aSUB.setsockopt(    zmq.CONFLATE, 1 )                  # do not re-recv "old"
   aSUB.setsockopt(    zmq.SUBSCRIBE, "" )                # do subscribe to whatever comes
   aSUB.connect( <transport-class>:<port#> )              # tcp:? udp-multicast?
   #-----------------------------------------------------------------------------[RTO]
   while <any reason>:
         try:
              if ( aSUB.poll( zmq.POLLIN, 0 ) == 0 ):
                 # nothing in the receiving Queue ready-to-.recv()
                 # sleep()
                 # do some system work etc
              else:
                 aPayLOAD = aSUB.recv( zmq.NOBLOCK )
                 #--------------------------------------------------------
                 # decompress / deserialise the original object
                 # capture = dill.loads( gzip.decompress( aPayLOAD ) )
                 #--------------------------------------------------------
                 # PROCESS THE DATA :
                 # ...
         except: 
              # handle as per errno ...
         finally:
              pass
   #----------------------------------------------------------------------
   aSUB.close()
   aCtx.term()

【讨论】:

  • zmq.COMPLETE 选项有什么作用?
  • @BenyaminJafari Mea Culpa - 代码输入得很匆忙,而且这个想法比我的打字技能还要快 - 使用的正确选项设置是 zmq.IMMEDIATE 模块常量,这可以防止将消息有效负载放入队列中以进行 L1/L2 不完整连接(ZeroMQ ZMTP/RFC 文档清楚地说明了状态完整连接管理的丰富程度),从而合理地避免了任何延迟步骤,这些步骤没有意义,并且可能会使达到&lt;= 5ms 交付目标变得复杂。再次原谅我的错误。
猜你喜欢
  • 1970-01-01
  • 2011-04-09
  • 2021-12-04
  • 1970-01-01
  • 1970-01-01
  • 2018-04-10
  • 1970-01-01
  • 1970-01-01
  • 2019-06-01
相关资源
最近更新 更多