【问题标题】:Why won't ZMQ drop messages?为什么 ZMQ 不会丢弃消息?
【发布时间】:2014-10-07 20:58:16
【问题描述】:

我有一个使用 PUB/SUB 设置从 ZeroMQ 发布者获取消息的应用程序。阅读器有时很慢,所以我在发送者和接收者上都设置了 HWM。我希望接收器从处理减速中恢复时会填满缓冲区并跳起来赶上。但我观察到的行为是它永远不会下降! ZeroMQ 似乎忽略了 HWM。我做错了吗?

这是一个最小的例子:

publisher.py

import zmq
import time

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB)

sock.setsockopt(zmq.SNDHWM, 1)

sock.bind("tcp://*:5556")

i = 0

while True:
    sock.send(str(i))
    print i
    time.sleep(0.1)
    i += 1

订阅者.py

import zmq
import time

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, "")
sock.setsockopt(zmq.RCVHWM, 1)
sock.connect("tcp://localhost:5556")

while True:
    print sock.recv()
    time.sleep(0.5)

【问题讨论】:

  • 澄清一下,您是说您的订阅者收到了一串从未跳过的完整数字吗?
  • @Jason 是正确的。我不会丢失一条消息。
  • 发送快点有区别吗?快得多,例如仅每 1000 条消息进行一次睡眠。
  • 我将发布者的延迟更改为 0,订阅者的延迟为 1 秒,并让它运行 25 分钟。仍然不要丢弃一条消息。

标签: python zeromq pyzmq


【解决方案1】:

我相信这里有几件事在起作用:

  1. High Water Marks are not exact(请参阅链接部分的最后一段)- 通常这意味着实际队列大小将小于您列出的数字,我不知道这在 1 会如何表现。
  2. 您的PUB HWM 永远不会丢弃消息...由于PUB 套接字的工作方式,无论是否有可用的订阅者,它都会立即处理消息。因此,除非它实际上需要 ZMQ 0.1 秒来处理通过队列的消息,否则您的 HWM 将永远不会在 PUB 方面发挥作用。

应该发生的事情类似于以下内容(我假设操作顺序可以让您实际接收到第一个发布的消息):

  1. 启动subscriber.py 并等待一段合适的时间以确保它完全启动(基本上是立即启动)
  2. 启动 publisher.py
  3. PUB 处理并发送第一条消息,SUB 接收并处理第一条消息
  4. PUB 休眠 0.1 秒并处理并发送第二条消息
  5. SUB 休眠 0.5 秒,套接字接收第二条消息但在队列中等待下一次调用 sock.recv() 处理它
  6. PUB 休眠 0.1 秒并处理并发送第三条消息
  7. SUB 还在休眠 0.3 秒,所以第三条消息应该在第二条消息之后进入队列,这将在队列中产生 2 条消息,第三条应该由于 HWM 而丢弃

...等等等等等等。

我建议进行以下更改以帮助解决问题:

  1. 删除您的发布者上的HWM...它只会添加一个我们不需要在您的测试用例中处理的变量,因为我们从不期望它会改变任何东西。如果您的生产环境需要它,请将其重新添加并稍后在大容量场景中进行测试。
  2. 将订阅者上的 HWM 更改为 50。这会使测试花费更长的时间,但您不会处于极端情况,并且由于 ZMQ 文档指出 HWM 不准确,因此极端情况可能会导致意外行为。请注意,我相信你的测试(小数字)不会那样做,但我没有看过实现队列的代码,所以我不能肯定地说,你的数据可能足够小您的有效HWM 实际上更大
  3. 将您的订阅者睡眠时间更改为整整 3 秒...理论上,如果您的队列恰好容纳 50 条消息,那么您将在两个循环内将其饱和(就像您现在所做的那样),然后您将拥有等待 2.5 分钟来处理这些消息,看看你是否开始跳过,在前 50 条消息之后应该开始跳过大组数字。但我至少要等5-10分钟。如果您发现在 100 或 200 条消息后开始跳过,那么您将被数据的小规模所困扰。

这当然不能解决如果您仍然不跳过任何消息会发生什么...如果您这样做并且仍然遇到同样的问题,那么我们可能需要更多地研究高水位标记的实际工作原理,我们可能缺少一些东西。

【讨论】:

  • 好的。我试过了。我还消除了发布者的延迟。它运行了 20 分钟,我没有丢掉一条消息!要么是魔法,要么 ZMQ 完全忽略了 HWM。
  • 我感觉我们缺少了一些东西。我还没有使用过 HWM(我自己的用途是处理数千和 KB 的数据,而不是数百万或 GB)。出于好奇,当它运行 20 分钟时,您的订阅者的计数上升了多高?大概,3秒的睡眠,应该是400左右吧?您是否尝试过嗅探网络流量以确保发布者的行为符合预期,立即发送消息并在订阅者处排队?只有当订阅者被命名时,发布者才应该将消息排队,并且只有在它实际上失去连接时才应该排队。
【解决方案2】:

我遇到了完全相同的问题,我的演示与你的几乎相同,订阅者或发布者在 zmq.RCVHWM 或 zmq.SNDHWM 设置为 1 后不会丢弃任何消息。

在参考 zguide 的第 5 章中的suicidal snail pattern 进行慢速订户检测后,我四处走动。希望能帮助到你。

顺便说一句:如果您解决了 zmq.HWM 的错误,请告诉我?

【讨论】:

  • 处理输入消息后,我使用了一个讨厌的循环,通过调用 sock.recv(zmq.NOBLOCK) 来清除缓冲区,直到它引发异常。即使这样也有几秒钟的陈旧输入堆积。我现在的猜测是网络套接字缓冲区有问题。 ZMQ 似乎同时使用了 OS 级别的网络缓冲区和 HWM 设置的内部队列。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-04-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-14
  • 1970-01-01
相关资源
最近更新 更多