【问题标题】:ZeroMq Router silently drops messagesZeroMq 路由器静默丢弃消息
【发布时间】:2023-03-30 07:59:02
【问题描述】:

我有一个服务器(ROUTER 套接字),它绑定并允许单个客户端(DEALER 套接字)连接到它。然后服务器开始发送数据。

理想情况下,我想知道路由器何时达到其 hwm 设置并开始丢弃消息。我已在路由器上将 ZMQ_ROUTER_MANDATORY 设置为 1,但这也无济于事。即使我故意没有启动客户端,路由器仍会继续报告消息已发送(isAlive = false,因此在另一端没有任何东西可以提取这些消息)。

是我做错了什么还是 HWM 设置在 ROUTER 套接字上根本不可靠?

我在 Windows 7 64 位上使用 jeromq 版本 0.3.1 和 jdk 1.6.0_32

谢谢

public final class SenderSocket implements Runnable{

    private final int total;
    private final int sentHwm;
    private final String address;
    private final Socket sendSocket;
    private final ExecutorService executor;

    private final static String NAME        = SenderSocket.class.getSimpleName( );
    private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


    public SenderSocket( ZContext context, String address, int sentHwm, int total ){
        this.address        = address;
        this.total          = total;
        this.sentHwm        = sentHwm;
        this.sendSocket     = context.createSocket( ZMQ.ROUTER );
        this.executor       = Executors.newSingleThreadExecutor( );
    }


    public void init( ){

        sendSocket.setSndHWM( sentHwm );
        sendSocket.setRouterMandatory( true );
        sendSocket.bind( address );

        executor.execute( this );
        LOGGER.info("ROUTER configured with HWM {} bound to {}.", sentHwm, address );

    }



    @Override
    public void run( ){         

        for( int i =0; i <total; i++ ){      

            try{

                String item     = new StringBuilder(8).append(i).toString();
                boolean result  = sendSocket.send( item );

                LOGGER.info("SENT>> [{}] [{}]", result, item );

            }catch( ZMQException zmqEx ){

                int errorCode = zmqEx.getErrorCode();

                if( ZError.EHOSTUNREACH == errorCode ){
                    LOGGER.warn("Attempted to send message to but dealer is DOWN!");
                }

                if( ZMQ.Error.ETERM.getCode() == errorCode ){
                    LOGGER.error("Received error code [{}], terminating.");
                    stop();
                }

                LOGGER.error("ZMQException while sending message.", zmqEx);

            }catch( Exception ex ){
                LOGGER.error("Exception while sending message.", ex );
            }

        }

        stop();

    }


    public void stop( ){
        sendSocket.setLinger( 0 );
    }


}

//客户

    public class ReceiverSocket implements Runnable{

        private final int hwm;
        private final String address;
        private final Socket recvSocket;
        private final ExecutorService executor;

        private volatile boolean isAlive;

        private final static String NAME        = ReceiverSocket.class.getSimpleName( );
        private final static Logger LOGGER      = LoggerFactory.getLogger( NAME );


        public ReceiverSocket( ZContext context, String address, int hwm ){
            this.address        = address;
            this.hwm            = hwm;
            this.recvSocket     = context.createSocket( ZMQ.DEALER );
            this.executor       = Executors.newSingleThreadExecutor( );
        }


        public void init( ){

            this.isAlive = false;

            recvSocket.setRcvHWM( hwm );
            recvSocket.connect( address );
            executor.execute( this );

            LOGGER.info("DEALER configured with HWM {} connected to {}.", hwm, address );

        }



        @Override
        public void run( ){         

            Poller poller       = new Poller( 1 );
            poller.register( recvSocket, Poller.POLLIN );

            while(  isAlive ){      

                try{

                    int pollCount       = poller.poll( );

                    if( pollCount == NEGATIVE_ONE ){
                        LOGGER.warn("ERROR! Was the thread interrupted?", pollCount );
                        isAlive = false;
                        return;
                    }

                    if( poller.pollin( ZERO ) ){
                        String data = recvSocket.recvStr( );
                        LOGGER.info("RECVD >> {} {}", data, NEWLINE );
                    }

                }catch( Exception e ){
                    LOGGER.error("Exception while receving message.", e);
                }

            }

        }


        public void stop( ){
            recvSocket.setLinger( 0 );
            LOGGER.info("{} Stopped!", NAME );
        }
}

//主要

public static void main( String[ ] args ) throws InterruptedException{

        int recvHwm          = 5;
        int sentHwm          = 5;
        int totalSent        = 5000;
        String address       = "tcp://*:20000";
        ZContext context     = new ZContext( 1 );

        ReceiverSocket recvr = new ReceiverSocket( context, address, recvHwm );
        SenderSocket sender  = new SenderSocket( context, address, sentHwm, totalSent );

        recvr.init();
        Thread.sleep( 1000 );

        sender.init();

    }

【问题讨论】:

    标签: messaging zeromq jeromq


    【解决方案1】:

    路由器强制和高水位标记彼此无关。

    我已在路由器上将 ZMQ_ROUTER_MANDATORY 设置为 1,但是 也无济于事。路由器继续报告消息 即使我故意没有启动客户端也会发送

    即使没有对等点连接到路由器,路由器也不会引发异常,除非您针对特定客户端 ID 处理消息。

    //#1 no exception raised here, message dropped silently
    rtr.setRouterMandatory(true)
    rtr.bind("tcp://*:20000")
    rtr.send("omg!")
    
    //#2 exception raised here
    rtr.setRouterMandatory(true)
    rtr.bind("tcp://*:20000")
    rtr.sendMore("client1")
    rtr.sendMore("")
    rtr.send("omg!")
    

    代码示例 #2 引发异常,因为您告诉路由器将“omg”发送给身份为 client1 的对等方。路由器套接字通过为每个连接的对等方分配一个随机标识来跟踪所有连接。如果路由器与client1 没有连接,或者,如果client1 先前断开连接,路由器将在这两种情况下引发异常。

    您可以在客户端分配一个身份来覆盖路由器的随机身份分配:

    client.setIdentity("client1".getBytes())
    client.connect("tcp://*:20000")
    

    上面的代码阻止路由器套接字在示例 #2 中抛出异常

    我建议阅读this,它解释了消息寻址和封装;了解其工作原理对于使用路由器套接字至关重要。

    【讨论】:

    • 感谢 Raffian,在设置身份后(不敢相信我忘记了),如果对等方未连接,我确实会收到异常。如果已达到其发送 HWM,是否有办法确定 ROUTER 是否正在丢弃消息? (假设 DEALER 已连接但根本不拉消息)。
    • 不存在检测ROUTER何时超过HWM的功能,见:grokbase.com/t/zeromq/zeromq-dev/119t6vb1yq/logging-hwm-events
    • 再次感谢。我刚刚在 0.3.1 中发现,如果设置了 setRouterMandatory(true) 并且已达到 hwm,则 ROUTER 实际上会阻塞。如果没有设置,它会按照记录的方式工作并丢弃消息。
    • 这很有趣,所以他们在这方面是有联系的;您可以发布包含该信息的链接吗?谢谢
    • 这是让我尝试测试的帖子。 comments.gmane.org/gmane.network.zeromq.devel/18624
    【解决方案2】:

    在 python 中,这会使路由器在这种情况下提高 zmq.Again:

        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.ROUTER)
    
        # make sure router doesn't just drop unroutable message
        self.socket.setsockopt(zmq.ROUTER_MANDATORY, 1)
    
        # make sure when there is unroutable message to raise exception after timeout
        timeout_sec = 2.5
        timeout_milisec = int(timeout_sec * 1000)
        self.socket.setsockopt(zmq.SNDTIMEO, timeout_milisec)
    

    http://grokbase.com/t/zeromq/zeromq-dev/12aje3ya9t/zmq-router-mandatory-was-zmq-router-behavior

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-12
      • 1970-01-01
      • 2011-07-03
      • 1970-01-01
      • 2014-06-20
      相关资源
      最近更新 更多