【问题标题】:How to send message to a specific Worker on NetMQ?如何向 NetMQ 上的特定 Worker 发送消息?
【发布时间】:2026-02-02 12:40:02
【问题描述】:

我在使用 C# 上的 ZeroMQ 向特定工作人员发送消息时遇到问题。

应用的架构如下:

例如,我想将消息从“client1”发送到“worker2”。 但我的应用程序发送到“worker1”,然后发送到“worker2”以进行下一步。

连接到经销商的工人代码:

public static void RRWorker(string[] args)
{
    if (args == null || args.Length < 2)
    {

        if (args != null)
        {
            if (args.Length < 1)
            {
                args = new string[] { "World", "tcp://127.0.0.1:5560" };
            }
            else
            {
                args = new string[] { args[0], "tcp://127.0.0.1:5560" };
            }
        }
        else
        {
            args = new string[] { "World", "tcp://127.0.0.1:5560" };
        }
        
    }

    string name = args[0];

    string endpoint = args[1];

    // Socket to talk to clients
    using (var context = new ZContext())
    using (var responder = new ZSocket(context, ZSocketType.REP))
    {
        responder.IdentityString = name;
        responder.Connect(endpoint);
        Console.WriteLine("Ready : " + name);

        while (true)
        {
            // Wait for next request from client
            using (ZMessage request = responder.ReceiveMessage())
            {                       
                Console.WriteLine("W "+name +" .Message count : " + request.Count);
                for (int i =0; i< request.Count; i++)
                {
                    Console.WriteLine("--W--" + request[i]);
                }
                // Send reply back to client
                Console.WriteLine("Worker Name : {0}... ", name);
                responder.Send(new ZFrame(name));
            }
        }
    }
}

我的客户代码:

 public static void RRClient(string name)
{
    // Socket to talk to router
    using (var context = new ZContext())
    using (var requester = new ZSocket(context, ZSocketType.REQ))
    {
        requester.IdentityString = name;
        //requester.Identity = Encoding.Unicode.GetBytes(name);
        requester.Connect("tcp://127.0.0.1:5559");
        ZFrame messageContent;
        ZFrame destination;

        var outgoing = new ZMessage();
        destination = new ZFrame("Worker2");
        messageContent = new ZFrame("test Message");
        
        outgoing.Append(destination);
        outgoing.Append(new ZFrame());
        outgoing.Append(messageContent);

        requester.Send(outgoing);
        using (ZFrame reply = requester.ReceiveFrame())
            {
                Console.WriteLine("Client request : Hello {0}!", reply.ReadString());
            }
    }
}

我的经纪人(路由器/经销商)的代码:

public static void RRBroker(string[] args)
{
    // Prepare our context and sockets
    using (var ctx = new ZContext())
    using (var frontend = new ZSocket(ctx, ZSocketType.ROUTER))
    using (var backend = new ZSocket(ctx, ZSocketType.DEALER))
    {
        frontend.Bind("tcp://*:5559");
        backend.Bind("tcp://*:5560");

        // Initialize poll set
        var poll = ZPollItem.CreateReceiver();

        // Switch messages between sockets
        ZError error;
        ZMessage message;
        ZMessage messageTosend;
        while (true)
        {
            if (frontend.PollIn(poll, out message, out error, TimeSpan.FromMilliseconds(64)))
            {                       
                for (int i = 0; i < message.Count; i++)
                {
                    Console.WriteLine("--F--" + message[i].ReadString());
                }
                Console.WriteLine("-----" + GloFunction.ZMessageToString(message) + "---");
                backend.Send(message);
            }
            else
            {
                if (error == ZError.ETERM)
                    return; // Interrupted
                if (error != ZError.EAGAIN)
                    throw new ZException(error);
            }

            if (backend.PollIn(poll, out message, out error, TimeSpan.FromMilliseconds(64)))
            {
                Console.WriteLine("B Message count : " + message.Count);
                for (int i = 0; i < message.Count; i++)
                {
                    Console.WriteLine("--B--" + message[i].ToString());
                }                       
                GloFunction.Console_WriteZMessage("backend", 2, message);
                
                frontend.Send(message);
            }
            else
            {
                if (error == ZError.ETERM)
                    return; // Interrupted
                if (error != ZError.EAGAIN)
                    throw new ZException(error);
            }
        }
    }
}

为了运行我的应用程序,我在特定线程中创建了我的所有设备(路由器、经销商、client1、client2、worker1、worker2)。

我运行“client1”两次时的示例:

Ready : Worker2
Ready : Worker1

--F--Client1
--F--
--F--Worker2
--F--
--F--test Message
-----Client1Worker2test Message---
W Worker2 .Message count : 3
--W--Worker2
--W--
--W--test Message
Worker Name : Worker2... 
B Message count : 3
--B--Client1
--B--
--B--Worker2
backend: Worker2
Client request : Hello Worker2!
Le thread 0x3d74 s'est arrêté avec le code 0 (0x0).

--F--Client1
--F--
--F--Worker2
--F--
--F--test Message
-----Client1Worker2test Message---
W Worker1 .Message count : 3
--W--Worker2
--W--
--W--test Message
Worker Name : Worker1... 
B Message count : 3
--B--Client1
--B--
--B--Worker1
backend: Worker1
Client request : Hello Worker1!
Le thread 0x4fd4 s'est arrêté avec le code 0 (0x0).

我做错了什么?

【问题讨论】:

    标签: c# .net zeromq netmq


    【解决方案1】:

    Dealer-Router 作为代理拓扑允许您连接客户端和服务而无需了解对方,并允许您动态添加更多只需要了解 Dealer/Router 的客户端/服务。 你可以在这里阅读更多: https://zguide.zeromq.org/docs/chapter2/#sockets-and-patterns (图 16 - 扩展的请求-回复)

    如果我理解正确 - 您确实希望在一个客户端 (REQ) 和服务 (REP) 之间建立耦合 - 为此您需要直接连接它们

    【讨论】:

      最近更新 更多