【发布时间】:2018-07-18 23:21:25
【问题描述】:
我正在尝试找出启用 ZMQ_CONFLATE 选项导致收不到消息的原因。
我重新创建了这个最小的测试用例(我的应用程序使用 XPUB/XSUB 代理,但是,这似乎不会改变这个测试的结果):
#include <atomic>
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <zmq.hpp>
#define USE_PROXY
std::atomic<bool> stop{false};
void pub_thread(zmq::context_t &context)
{
zmq::socket_t pub(context, zmq::socket_type::pub);
#ifdef USE_PROXY
pub.connect("tcp://localhost:38922");
#else
pub.bind("tcp://*:38923");
#endif
long i = 0;
for(;;)
{
if(stop) break;
std::string m = boost::lexical_cast<std::string>(i);
zmq::message_t hdr(6);
memcpy(hdr.data(), "topic1", 6);
zmq::message_t msg(m.size());
memcpy(msg.data(), m.data(), m.size());
std::cout << "send: " << m << std::endl;
if(!pub.send(hdr, ZMQ_SNDMORE) || !pub.send(msg))
std::cout << "send error" << std::endl;
i++;
boost::this_thread::sleep_for(boost::chrono::milliseconds{20});
}
}
void sub_thread(zmq::context_t &context)
{
zmq::socket_t sub(context, zmq::socket_type::sub);
const int v_true = 1;
sub.setsockopt(ZMQ_CONFLATE, &v_true, sizeof(v_true));
#ifdef USE_PROXY
sub.connect("tcp://localhost:38921");
#else
sub.connect("tcp://localhost:38923");
#endif
sub.setsockopt(ZMQ_SUBSCRIBE, "topic1", 6);
for(;;)
{
if(stop) break;
zmq::message_t hdr, msg;
if(!sub.recv(&hdr) || !hdr.more() || !sub.recv(&msg))
std::cout << "recv error" << std::endl;
std::string m(reinterpret_cast<const char*>(msg.data()), msg.size());
std::cout << " recv: " << m << std::endl;
boost::this_thread::sleep_for(boost::chrono::milliseconds{250});
}
}
void proxy_thread(zmq::context_t &context)
{
#ifdef USE_PROXY
zmq::socket_t xpub(context, zmq::socket_type::xpub);
xpub.bind("tcp://*:38921");
zmq::socket_t xsub(context, zmq::socket_type::xsub);
xsub.bind("tcp://*:38922");
std::cout << "starting xpub/xsub proxy" << std::endl;
zmq::proxy(xpub, xsub, nullptr);
std::cout << "xpub/xsub proxy terminated" << std::endl;
#endif
}
void timeout_thread()
{
boost::this_thread::sleep_for(boost::chrono::seconds{4});
stop = true;
boost::this_thread::sleep_for(boost::chrono::seconds{1});
exit(0);
}
int main(int argc, char **argv)
{
zmq::context_t context(1);
boost::thread t0(&timeout_thread);
boost::thread t1(&proxy_thread, boost::ref(context));
boost::this_thread::sleep_for(boost::chrono::seconds{1});
boost::thread t2(&sub_thread, boost::ref(context));
boost::this_thread::sleep_for(boost::chrono::seconds{1});
boost::thread t3(&pub_thread, boost::ref(context));
t0.join();
}
简要说明:我们有 4 个线程:
- pub 线程:每 20 毫秒将递增计数器的值写入 PUB 套接字
- 子线程:每 250 毫秒从 SUB 套接字读取值(消息应该排队,但由于 conflate 选项,应该丢弃,除了最近的)
- 代理线程:运行 XPUB/XSUBN 代理(如果定义了 USE_PROXY)
- 线程超时:4 秒后停止一切
我观察到的输出如下:
starting xpub/xsub proxy
send: 0
send: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87
即没有收到任何消息。
预期的输出应该是这样的:
starting xpub/xsub proxy
send: 0
send: 1
recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
recv: 11
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
recv: 21
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
recv: 33
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
recv: 45
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
recv: 55
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
recv: 66
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
recv: 77
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87
我也尝试在sub.connect(... 之后移动sub.setsockopt(ZMQ_CONFLATE,...,但在这种情况下它没有效果,与删除 ZMQ_CONFLATE 行的效果相同:
starting xpub/xsub proxy
send: 0
send: 1
recv: 1
send: 2
send: 3
send: 4
send: 5
send: 6
send: 7
send: 8
send: 9
send: 10
send: 11
recv: 2
send: 12
send: 13
send: 14
send: 15
send: 16
send: 17
send: 18
send: 19
send: 20
send: 21
send: 22
recv: 3
send: 23
send: 24
send: 25
send: 26
send: 27
send: 28
send: 29
send: 30
send: 31
send: 32
send: 33
send: 34
recv: 4
send: 35
send: 36
send: 37
send: 38
send: 39
send: 40
send: 41
send: 42
send: 43
send: 44
send: 45
recv: 5
send: 46
send: 47
send: 48
send: 49
send: 50
send: 51
send: 52
send: 53
send: 54
send: 55
send: 56
recv: 6
send: 57
send: 58
send: 59
send: 60
send: 61
send: 62
send: 63
send: 64
send: 65
send: 66
send: 67
recv: 7
send: 68
send: 69
send: 70
send: 71
send: 72
send: 73
send: 74
send: 75
send: 76
send: 77
send: 78
recv: 8
send: 79
send: 80
send: 81
send: 82
send: 83
send: 84
send: 85
send: 86
send: 87
ZMQ 版本:4.2.5
【问题讨论】:
-
您可以跟踪一下何时调用带有 SNDMORE 标志的 send 以及何时调用不带标志的 send 吗?一种可能性是你永远不会在没有标志的情况下调用 send (到目前为止,我在对角线上阅读了你的代码,我可能是错的)
-
当
pub.send(hdr, ZMQ_SNDMORE)返回true(发送成功),!pub.send(hdr, ZMQ_SNDMORE)为false,所以执行if的第二个分支(||),所以不会发生“短路” " 评价 -
api.zeromq.org/4-0:zmq-msg-send "如果成功,zmq_msg_send() 函数应返回消息中的字节数。否则,它应返回 -1 并将 errno 设置为下面定义的值之一。"顺便说一句,
zmq::socket_t::send(message_t &msg_, int flags_)返回一个bool -
在没有合并的情况下可以工作吗?