【问题标题】:ZeroMQ: is there a limitation for the amount of topics that SUBs can subscribe for?ZeroMQ:SUB 可以订阅的主题数量有限制吗?
【发布时间】:2017-05-24 15:51:05
【问题描述】:

我正在使用 ZeroMQ 的 PUB/SUB 套接字模式。 PUB 制作股票的财务数据并发布。主题设置为每只股票的代码。在 SUB 端,客户可以根据股票代码订阅他们想要的数据。 PUB 用 C 编写,SUB 用 Python 编写。

但是,在测试过程中出现了问题。如果只将一个股票代码设置为 SUB 套接字上的消息过滤器,则一切正常。但是当涉及到大量股票时,程序会在短时间内崩溃,并报错“Segmentation fault (core dumped)”(详情如下)。

这是PUB (C)的代码:

while (1) {  
    int rc = 0;
    // send topic
    rc = zmq_send(pub_socket, topic, rc, ZMQ_SNDMORE);
    if (rc == -1) {
        // error handling
    }
    // send stock data
    rc = zmq_send(pub_socket, data, rc, 0);
    if (rc == -1) {
        // error handling
    }
}

这里是SUB (Python)的代码:

import zmq

# initialize a SUB socket
ctx = zmq.Context.instance()
socket = ctx.socket(zmq.SUB)

# set socket options to filter message
for code in code_list:
    socket.setsockopt_string(zmq.SUBSCRIBE, code)

socket.connect(PUB_ADDR)
# recv data from PUB
while True:
    data = socket.recv()
    print(data)

我还使用 gdb 来调试程序。 调试结果如下: debug result

有人知道程序为什么会崩溃吗?欢迎任何帮助,谢谢。


更新

如果我将setsockopt_string 部分替换为以下代码,Python 脚本运行良好。奇怪的事情...我需要深入挖掘setsockopt_string 函数。

Python 中的新代码:

socket.setsockopt_string( zmq.SUBSCRIBE, "" )

最新更新

我运行了@user3666197 提供的脚本,得到了调试日志。我只选择了日志的几个部分,因为它很长。

套接字初始化 setsockopt_string 完成 接收一个味精并退出

【问题讨论】:

  • 您的机器上有源代码 (malloc.c) 吗?如果在bt 之后输入list,你会看到什么?机器有多少内存?
  • @Attie 1. 我的机器上似乎没有 malloc.c,因为我使用find 命令搜索文件并且没有返回结果; 2. 有点不知道怎么调试问题,明天交易期间调试程序,更新list结果; 3.机器内存32GB
  • 由于 SEGFAULT 在 malloc.c 中,我怀疑存在一些堆损坏。可能值得获得资源......你的操作系统是什么?在 Ubuntu 上,您可以运行:apt-get install libc6-dev(需要 root,并替换正确的包名称)。如果你需要更多,你可以运行apt-get source libc6(不需要root)。然后,您可以找到 malloc.c:3588 并获得有关问题所在的提示。
  • 可能还值得研究一下 python gdb 扩展,看看你的 python 应用程序中发生崩溃的地方......我从来没有运气好,所以你可能需要找其他人筹码。wiki.python.org/moin/DebuggingWithGdb / docs.python.org/devguide/gdb.html
  • @user3666197 抱歉,出现了一些其他事情,分散了我的注意力。我修改了PUB端的代码结构,手指受伤了,所以还没有运行测试脚本。我将在运行脚本后更新 DEBUG:log,可能是 2 或 3 天后。

标签: python c zeromq pyzmq


【解决方案1】:

简介:

PUB-side 使用 ZeroMQ v 4.1.5; SUB-side 使用 ZeroMQ Python wrapper 16.0.2

这隐含地使PUB/SUB 模式(与返回到 v 2.0 的前几代 API 形成对比)依赖于 PUB 端过滤,而您的 SIGSEGV 指示在SUB-side 上报告问题。

尽管假设过滤是根本原因,但我记得一些关于大树过滤问题的技术辩论,仍然有一个小惊喜,就像在一些关于 Trie-search 的帖子中一样,添加了 "" 叶节点也做了一个神奇的服务。如果有帮助,将尝试再次找到这场辩论。

在 ZeroMQ 过滤器中初始 remarks from Martin Sustrik refer up to 约 10,000 个订阅不会产生问题(在进一步的设计讨论中会有一些更高的数字):

高效订阅匹配

在 ZeroMQ 中,简单的尝试用于存储和匹配 PUB/SUB 订阅。订阅机制适用于最多 10,000 个订阅,其中简单的 trie 运行良好。但是,有些用户使用了多达 150,000,000 个订阅。在这种情况下,需要更有效的数据结构。因此,nanomsg 使用内存高效版本的 Patricia trie 而不是简单的 trie。

更多详情请查看this article


始终至少使用分步方法诊断原因:

轻微的测试修改将使您更接近于打开问题的真实信封:

import zmq
pass;                                           print "DEBUG: Ok, zmq imported. [ver:{0:}]".format( zmq.pyzmq_version() )
#_______________________________________________# SETUP ZMQ:
ctx    = zmq.Context( 2 )                       # Context( nIOthreads )
pass;                                           print "DEBUG: Ok, zmq.Context() instantiated."  
socket = ctx.socket(  zmq.SUB )                 # Socket(  .SUB )
pass;                                           print "DEBUG: Ok, Socket instantiated."
socket.connect(           PUB_ADDR )            # .connect()
pass;                                           print "DEBUG: Ok, .connect() completed."
socket.setsockopt(   zmq.LINGER, 0 )            # explicit LINGER
pass;                                           print "DEBUG: Ok, .setsockopt( LINGER, 0 ) completed."
#_______________________________________________# SET FILTER:
for code in code_list:
    pass;                                       print "DEBUG: Going to set SUB side n-th filter: {0: > 1000d}. == [{1:}]".format( code_list.index( code ), repr( code ) ),
    socket.setsockopt_string( zmq.SUBSCRIBE, code )
    pass;                                       print "DEBUG: Ok, this one was done."
pass;                                           print "DEBUG: Ok, all items from <code_list> exhausted."
#_______________________________________________# LOOP FOREVER:
while True:
    try:
          print "LOOP: .recv() call."
          data = socket.recv()
          print "LOOP: .recv()-ed {0:}[B] repr()-ed as [{1:}]".format( len( data ), repr( data ) )

    except KeyboardInterrupt():
          print "EXC: Ctrl-C will terminate."

    except:
          print "EXC: will terminate."

    finally:
          pass;                                 print "DEBUG: Ok, finally: section entered:"
          socket.close()
          pass;                                 print "DEBUG: Ok, Socket instance .close() call returned"
          ctx.term()
          pass;                                 print "DEBUG: Ok, .Context() instance term()-ed"
          break

鉴于描述为只有一个 PUB 和一个 SUB 的测试用例,其他性能扩展和详细的缓冲区管理问题不会爆发目前的问题。运行修改后的测试并发布琐碎的 DEBUG:log 后,将看到结果。

每秒发送大约 3k 条消息也不成问题。


更新:漏点 -- (1) Unicode 处理 + (2) 主题过滤器

(1) 如 DEBUG:log 中所示,您可以混合使用 Unicode 和纯 C 字节数组。 These representations MUST match - system-wide(从.send_string(),通过.setsockopt_string(),直到.recv_string()

data = socket.recv_string()           # AS YOUR DEBUG:log shows the b'mkt_bar...'

(2) 主题-过滤器必须匹配 - 否则消息将被分类为未订阅的消息 ... 这样 u'abc .... '过滤器匹配 u'abc ....' 消息。否则:

setsockopt_string( option, optval, encoding='utf-8' )

长度为零的空 optval 应订阅所有传入消息。 非空optval 应订阅所有以指定前缀开头的消息。 多个过滤器可以附加到单个 ZMQ_SUB 套接字,在这种情况下,一条消息应该是如果匹配至少一个过滤器,则接受。

DEBUG:上面提供的日志片段显示(嗯,PrintScreens ... -- 请下次复制/粘贴终端 ASCII,而不是图片,除非显示一些 GUI 功能,对吗?谢谢... ),您的主题过滤器在定义的意义上永远不会匹配。修复它。系统范围。

ZeroMQ 不应该因此受到责备,Unicode + C 字节数组根本无法工作,而且如果混合使用或调用接口错误。


结语:

如果仍然指责 ZeroMQ 主题过滤能力,最简单的 (dis) 批准 Null 假设的 a/b 测试将是运行 VERY SAME 测试,但仅使用 5-topic-filter 元素。如果这也崩溃了,那么您关于容量相关限制的假设是错误的。

继续走!

【讨论】:

  • 我已经发布了 DEBUG:log。如日志所示,程序收到一条消息并退出。根据上面的文章,ZeroMQ 使用一种内存效率低的方式来存储消息主题。就我而言,主题类似于“mkt_bars_min1 + code_name”,并且似乎这样的主题会占用大量内存。这就是程序会崩溃的原因吗?
  • 非常感谢。我确实很少注意 Unicode 处理,我会更深入地研究它。正如您所建议的那样——使用 5-topic-filter 元素运行测试,我已经测试了应用 300-topic-filter 元素并且程序运行良好的情况.
  • 对不起,我想你误解了我的意思。前几天我测试了300个主题过滤元素的情况,但是当主题数量增加到4000左右时,程序仍然会崩溃,所以我的问题并没有真正解决。仍然感谢您的帮助:)
  • 那么,您能否确认一下,unicode/byte-array 不匹配和主题过滤器/消息不匹配(均在 DEBUG:log 中观察到)已得到纠正,您仍然会遇到运行时崩溃?您能否更新您的帖子并添加 DEBUG:log 的文本副本?
  • 我现在无法运行脚本,因为交易时间已经结束。运行测试后,我将更新 DEBUG:log。
猜你喜欢
  • 2016-02-10
  • 1970-01-01
  • 1970-01-01
  • 2014-04-08
  • 1970-01-01
  • 2020-09-12
  • 2021-09-28
  • 2021-01-28
  • 2017-10-28
相关资源
最近更新 更多