【问题标题】:ZeroMQ/0MQ Push/Pull memory and routing issuesZeroMQ/0MQ 推/拉内存和路由问题
【发布时间】:2012-04-23 02:23:22
【问题描述】:

我在 ZeroMQ 上玩了一段时间,并提出了几个问题/问题。如果 ZeroMQ 的任何贡献者或任何曾经使用或当前使用该库的人能够加入我们将不胜感激。

* 假设我有一个路由器/转发器和 2 个不同的客户端 (c1,c2)。我想通过路由设备将消息从 client1 推送到 client2。路由器从任何客户端(此处为 client1)提取消息并将它们发布到任何订阅的客户端(此处为 client2)。我目前将此类消息路由到相应客户端的唯一方法是通过 pub/sub,但是,a)我想通过发送 routingTo 标记以及消息正文来决定如何在运行时路由,b)我想使用 push /pull 转发给客户端,而不是发布/订阅,因为我想在设置高水位标记属性时实现阻塞功能,c)我想让 c1 和 c2 连接到 1 个端口用于推送和 1 个端口用于订阅。我能否以某种方式在路由器端进行更改以便不必使用 pub/sub 或者 pub/sub 是路由到客户端的唯一方法,即使我知道在路由端应该将消息转发到哪里?我读到当队列大小超过我不想要的 hwm 时,pub/sub 会丢弃消息。我也不想实现请求/回复模式,因为它增加了不必要的开销,因为我不需要回复。

* 在运行以下代码(Push/Pull -> Pub/Sub)并发送所有消息并收到确认所有消息已收到后,推送消息的客户端仍然显示巨大的内存占用,显然 Push 套接字的队列中仍有大量消息。为什么会这样?我能做些什么来解决这个问题?

这是我的代码:

路由器:

class Program
{
    static void Main(string[] args)
    {
        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.PULL), socketOut = context.Socket(SocketType.XPUB))
            {
                socketIn.HWM = 10000;
                socketOut.Bind("tcp://*:5560"); //forwards on this port
                socketIn.Bind("tcp://*:5559"); //listens on this port

                Console.WriteLine("Router started and running...");

                while (true)
                {
                    //Receive Message
                    byte[] address = socketIn.Recv();
                    byte[] body = socketIn.Recv();

                    //Forward Message
                    socketOut.SendMore(address);
                    socketOut.Send(body);
                }
            }
        }
    }
}

客户1:

class Program
{
    static void Main(string[] args)
    {
        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.SUB), socketOut= context.Socket(SocketType.PUSH))
            {
                byte[] iAM = Encoding.Unicode.GetBytes("Client1");
                byte[] youAre = Encoding.Unicode.GetBytes("Client2");
                byte[] msgBody = new byte[16];

                socketOut.HWM = 10000;
                socketOut.Connect("tcp://localhost:5559");
                socketIn.Connect("tcp://localhost:5560");
                socketIn.Subscribe(iAM);

                Console.WriteLine("Press key to kick off Test Client1 Sending Routine");
                Console.ReadLine();

                for (int counter = 1; counter <= 10000000; counter++)
                {
                    //Send Message
                    socketOut.SendMore(youAre);
                    socketOut.Send(msgBody);
                }

                Console.WriteLine("Client1: Finished Sending");
                Console.ReadLine();
            }
        }
    }
}

客户2:

class Program
{
    public static int msgCounter;

    static void Main(string[] args)
    {
        msgCounter = 0;

        using (var context = new Context(1))
        {
            using (Socket socketIn = context.Socket(SocketType.SUB), socketOut = context.Socket(SocketType.PUSH))
            {
                byte[] iAM = Encoding.Unicode.GetBytes("Client2");

                socketOut.Connect("tcp://localhost:5559");
                socketIn.Connect("tcp://localhost:5560");
                socketIn.Subscribe(iAM);

                Console.WriteLine("Client2: Started Listening");

                //Receive First Message
                byte[] address = socketIn.Recv();
                byte[] body = socketIn.Recv();
                msgCounter += 1;

                Console.WriteLine("Received first message");

                Stopwatch watch = new Stopwatch();
                watch.Start();

                while (msgCounter < 10000000)
                {
                    //Receive Message
                    address = socketIn.Recv();
                    body = socketIn.Recv();
                    msgCounter += 1;
                }

                watch.Stop();
                Console.WriteLine("Elapsed Time: " + watch.ElapsedMilliseconds + "ms");
                Console.ReadLine();
            }
        }
    }
}

【问题讨论】:

    标签: c# sockets tcp messaging zeromq


    【解决方案1】:

    我将建议您的架构可能有点偏离这里。

    1) 如果您需要一个 PUSH 和一个 PULL,请从中间移除设备。设备被显式添加到架构中以管理多个消费者,这样您就不必在每次添加节点时更新生产者。当/如果您确实到达需要多个消费者和/或生产者的地方,您将需要连接到设备上的每个节点——这就是它们的工作方式。在这种情况下,听起来该设备使您的解决方案过于复杂。

    2) 有“路由到”标签的想法真的让我大吃一惊。选择消息传递而不是其他集成选项的最大原因可能是将生产者和消费者解耦,这样任何一方都不必知道对方的任何信息(除了在无代理设计的情况下将消息发送到哪里)。将路由信息直接添加到您的逻辑中会破坏这一点。

    至于开销,我从未经历过。但是,我以前从未使用过 ZeroMQ 的 .Net 驱动程序,因此未受过教育的猜测是查看 .Net 驱动程序本身。

    【讨论】:

    • 对不起,如果我不清楚,该架构将同时包含许多作为生产者/消费者的实体,与框架中的任何其他实体进行通信。因此,点对点通道没有意义恕我直言,因为它会引入太多通道。 2)我强烈不同意您认为路由知识是不可取的观点。我同意消息体应该与路由逻辑分离,但是,正如我想我已经明确指出的那样,整个框架中最终有 n 个实体,每个实体都知道它想将特定消息发送到哪里。
    • 大图是我希望不同进程之间的进程间通信,并且每个进程都知道要处理哪个其他进程,因此路由逻辑很有意义。盲目地发送消息并让所有连接的实体接收它们并在接收端决定接收到的消息是否实际上是应该由该实体处理的消息是对带宽和计算能力的巨大浪费。
    • 啊。那就道歉吧。原来的帖子让我相信你实际上只有两个客户端节点,你试图与中间的路由器集成。也许“routeTo”只是我被挂断的语义。将您的“routeTo”描述为您的路由器用于做出路由决策的“主题”是否准确?如果是这样,我可以编辑上述答案以重新理解。我也会留下原来的回复,这样来这个帖子的人在阅读这篇文章时就可以了解整个上下文。
    • 嗨,Shaun,是的,routeTo 仅用于让路由器将消息转发给那些提交匹配订阅的订阅者。请注意,我随后将客户端代码更改为现在通过 XSUB 套接字(以前是 SUB)订阅,并且路由器端的过滤现在可以工作。但是,我仍然只看到 Pub/Sub 的这项工作,而不是任何其他套接字类型。
    猜你喜欢
    • 2014-10-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-09-14
    • 2011-03-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多