【问题标题】:ZMQ latency with PUB-SUB (slow subscriber)PUB-SUB 的 ZMQ 延迟(慢订阅者)
【发布时间】:2021-01-18 05:22:46
【问题描述】:

我发现了很多关于类似主题的问题,但它们并没有帮助我解决我的问题。

使用:

  • Linux Ubuntu 14.04
  • python 3.4
  • zmq : 4.0.4 // pyZMQ 14.3.1

TL;DR

即使在设置了 HWM 之后,ZMQ SUB 套接字中的接收者队列也会无限增长。当订阅者比发布者慢时会发生这种情况。 我能做些什么来防止它?

背景

我在人机交互领域工作。我们有一个庞大的代码库来控制鼠标光标之类的东西。我想在几个模块中“打破它”,与 ZMQ 通信。 它必须具有尽可能小的延迟,但丢弃(丢失)消息并不那么重要。

另一个有趣的方面是可以在节点之间添加“间谍”。因此 PUB/SUB 插座似乎是最合适的。

类似这样的:

+----------+                +-----------+                 +------------+
|          | PUB            |           |  PUB            |            |
|  Input   | +----+------>  |  Filter   |  +----+------>  |   Output   |
|          |      |     SUB |           |       |     SUB |            |
+----------+      v         +-----------+       v         +------------+
               +-----+                       +-----+                   
               |Spy 1|                       |Spy 2|                   
               +-----+                       +-----+       

问题

一切正常,除了我们添加了间谍。 如果我们添加一个间谍来执行“繁重的工作”,例如使用 matplotlib 进行实时可视化,我们会注意到绘图中的延迟增加。 IE:在上图中,过滤和输出很快,没有看到延迟,但是在 Spy 2 上,运行 20 分钟后延迟可以达到 10 分钟(!!)

看起来接收器上的队列无限增长。 我们调查了 ZMQ 的高水位 (HWM) 功能,将其设置为低以丢弃旧消息,但没有任何改变。

最小代码

架构:

+------------+                +-------------+
|            |  PUB           |             |
|   sender   | -------------> |  receiver   |
|            |             SUB|             |
+------------+                +-------------+

接收器是一个慢速接收器(在第一张图中充当间谍)

代码:

发件人.py

import time
import zmq

ctx = zmq.Context()

sender = ctx.socket(zmq.PUB)
sender.setsockopt(zmq.SNDBUF, 256)
sender.set_hwm(10)
sender.bind('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(sender.get_hwm()) ## 10

i = 0
while True:
    mess = "{} {}".format(i, time.time())
    sender.send_string(mess)
    print("Send : {}".format(mess))
    i+= 1

receiver.py:

import time
import zmq

ctx = zmq.Context()
front_end = ctx.socket(zmq.SUB)

front_end.set_hwm(1)
front_end.setsockopt(zmq.RCVBUF, 8)

front_end.setsockopt_string(zmq.SUBSCRIBE, '')
front_end.connect('tcp://127.0.0.1:1500')

print(zmq.zmq_version()) ## 4.0.4
print(zmq.__version__) ## 14.3.1
print(front_end.get_hwm()) ## 1

while True:
    mess = front_end.recv_string()
    i, t = mess.split(" ")
    mess = "{} {}".format(i, time.time() - float(t))
    print("received : {}".format(mess))
    time.sleep(1)  # slow

我认为这不是 ZMQ Pub/Sub 的正常行为。 我尝试在接收器、订阅器和两者中都设置 HWM,但没有任何改变。

我错过了什么?

Edit :

我认为我在解释我的问题时并不清楚。我做了一个移动鼠标光标的实现。输入是在 ZMQ 中以 200Hz 发送的鼠标光标位置(带有 .sleep( 1.0 / 200 ) ),完成了一些处理并更新了鼠标光标位置(在我的最小示例中我没有这个睡眠)。

一切都很顺利,即使我发射了间谍。尽管如此,间谍的延迟却越来越长(因为处理速度慢)。延迟不会出现在光标中,位于“管道”的末尾。

我认为问题出在订阅者排队消息缓慢。

在我的示例中,如果我们杀死发送者并让接收者活着,消息将继续显示,直到显示所有(?)提交的消息。

间谍正在绘制光标位置以提供一些反馈,有这样的延迟仍然很不方便......我只是想得到最后发送的消息,这就是我试图降低HWM的原因。

【问题讨论】:

  • ZMQ HWM 不是与特定消息计数相关的确切数字。它更多的是关于 ZMQ 将为您的队列分配多少内存的一般建议。关键是,很难调试 HWM 的特定问题,但可以公平地说,人们遇到的大多数问题 HWM 只是不了解它的不确切性质。您是否确认 ZMQ 专用于您的队列的内存量是问题的一部分?我觉得你的问题的根源更有可能不是你的应用程序接收消息的速度,而是它对它们做了什么。

标签: python performance sockets zeromq


【解决方案1】:

缺少更好的实时设计/验证

ZeroMQ 是一个强大的消息传递层。

也就是说,检查它在原始while True:killer-loop 中每秒真正发送多少消息

测量它。设计基于事实,而不是感觉。

事实很重要。

start_CLK = time.time()                                    # .SET _CLK
time.sleep( 0.001)                                         # .NOP avoid DIV/0!
i = 0                                                      # .SET CTR
while True:                                                # .LOOP
    sender.send_string( "{} {}".format( i, time.time() ) ) # .SND ZMQ-PUB 
    print i / ( time.time() - start_CLK )                  # .GUI perf [msg/sec]
    i+= 1                                                  # .INC CTR

ZeroMQ 尽最大努力在计划中填充雪崩。

而且它在这方面做得很好。

您的 [Filter] + [Spy1] + [Output] + [Spy2 ] 管道处理,端到端,

  • 更快,包括。 .send() + .recv_string() 开销都比 [Input]-sender

  • 成为主要阻塞病态的元素,导致内部 PUB/SUB 队列增长、增长、增长

这个队列链问题可以通过另一种架构设计来解决。

需要重新思考的事情:

  1. sub-sample [Filter].send() 节奏(交错因子取决于您控制的实时过程的稳定性问题--是 1 毫秒(顺便说一句,O/S 计时​​器分辨率,因此 COTS O/S 计时​​器控制无法进行量子物理实验:o)),双向语音流 10 毫秒,TV/GUI 流 50 毫秒,300键盘事件流等的毫秒)

  2. online v/s offline 后处理/可视化(你注意到matplotlib 处理很重,你通常承受大约 800 - 1600 - 3600毫秒开销,即使在简单的 2D 图形上也是如此 - 测量它,然后再决定 PUB/SUB-proc1>-PUB/SUB-proc2> 处理架构(您已经注意到,spy2> 会导致 proc2>-PUB 馈送和发送开销增加问题)。

  3. 线程数与执行它们的 localhost 内核数 -- 从 localhost ip 可以看出,所有进程都驻留在同一个 localhost 上。加上每个使用的 ZMQ.Context 添加一个线程,加上 查看 Python GIL 锁定开销(如果所有线程都已实例化) 来自同一个 Python 解释器...阻塞增长。阻挡很痛。 更好的分布式架构可以提高这些性能 方面。但是,请先查看 [1] 和 [2]

n.b. 将 20 分钟的处理管道延迟(实时系统 TimeDOMAIN skew )称为延迟是很委婉的说法

【讨论】:

  • @ice3 您的问题编辑有所帮助。正如这里的同事已经指出的那样,ZMQ-SUB 具有传入的队列缓冲区(+ inproc:// 传输类甚至在 PUB 和 SUB 线程之间共同分配和共享这些宝贵的 memory.IO 资源(由于MUTEX Stick-slick-blocking 在 ZMQ 模式的两端的范围))。这使得消息得到 SUB.recv_string() 仍然传递(来自 Q )。让 [SPY2]-SUB 组件在 [Filter]-PUB 之后“链接”对 [Filter] 有额外的不利影响,因为它因此有更多的内存必须排队等待缓慢的 [SPY2]-SUB...
  • 如 [2] matplotlib 中所述,即使配备了一些智能重入计算重用策略,也很难使其刷新/重新配置它自己GUI in 200Hz 上的内部数据结构和后处理 .show()。一些测井数据+子采样在这里进行可行的处理
  • @ice3 来自您的编辑附加组件:您似乎仍然颠倒了逻辑。 不应归咎于 ZMQ-SUB 排队。 [SPYn] 进程在处理数字时很慢,因此它不能以与 PUB 端 .send_string() 一样的速度从 SUB 组件中 .recv_string() 添加一个又一个又一个以 200Hz 的节奏采样,而 matplotlib 仍然计算和后处理-尚未完成它的“旧”-previous -refresh-cycle 以将其带入 GUI...照片。 [[ 测量绘图: timeit.Timer( stmt='''...''', setup = ''' ...''' ).timeit( number = 200 ) ]]
  • 我刚刚在间谍之前尝试了重新采样,它奏效了!谢谢!
  • @ice3 作为一个微不足道的改进,您可以减少一些 [usec] 来消除对 TCP/IP 堆栈设置/分解的需求,因为您的线程驻留在同一个 localhost (可以改用 ipc:// 传输类,它没有 SAR 开销)
【解决方案2】:

来自http://zguide.zeromq.org/page:all#toc50

当您的套接字达到其 HWM 时,它将根据套接字类型阻止或丢弃数据。如果 PUB 和 ROUTER 套接字到达它们的 HWM,它们将丢弃数据,而其他套接字类型将阻塞。在inproc传输上,发送方和接收方共享相同的缓冲区,所以真正的HWM是双方设置的HWM之和。

所以 SUB 套接字并不会真正丢弃旧消息。您可以使用路由器做一些诡计来实现丢弃订阅者,或者考虑一种可以满足慢速元素的设计。 Zero 的好处之一是您的许多核心代码可以保持不变,并且您可能会绕过处理套接字的包装器。

【讨论】:

    猜你喜欢
    • 2023-04-02
    • 1970-01-01
    • 1970-01-01
    • 2018-11-13
    • 1970-01-01
    • 2020-09-26
    • 1970-01-01
    • 2018-05-13
    相关资源
    最近更新 更多