【问题标题】:Reconnecting to IBM MQ Queue on connection failure连接失败时重新连接到 IBM MQ 队列
【发布时间】:2018-07-07 11:22:26
【问题描述】:

以下代码 sn-p 包含我的 IBM MQ 队列的连接和订阅逻辑。当出现连接失败时,我使用 IConnection.ExceptionListener 委托通过队列建立新连接并重新订阅消息。但问题是,我可以看到多个队列句柄。如何确保关闭之前的连接句柄并在由于网络问题或 MQ 服务器重新启动而导致连接中断时建立新连接?

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;

private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    //Connection
    _connection = _connectionfactory.CreateConnection (null, null);
    _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

    //Session
    _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

    //Destination
    _destination = _session.CreateQueue ("queue://My.Queue.Name");
    _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
    _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

    //Consumer
    _consumer = _session.CreateConsumer (_destination);
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            //Using any of these two statements is termination my code. Debugger doesn't move to CreateWebsphereQueueConnection() line of code at all
            //_conection.Stop()
            //_conection.Close()
            CreateWebsphereQueueConnection ();
            Subscribe (onMessageReceived);
        };

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}

【问题讨论】:

    标签: c# ibm-mq xms


    【解决方案1】:

    IBM.XMS.dll 将负责使用 -r 开关完成 MQ 故障转移或重新启动。但是,如果在要求连接的客户端重新连接的情况下重新启动,XMS 库将不会尝试重新连接,并且客户将不得不手动处理这种情况,正如 @Shashi 和 @JoshMc 所指出的那样。

    我必须处理这种情况并按以下方式更改我的 Connection ExceptionListener 对我有帮助:

    private CancellationToken _cancellationToken;
    private IConnection _connection;
    private IConnectionFactory _connectionfactory;
    private IMessageConsumer _consumer;
    private IDestination _destination;
    private MessageFormat _msgFormat;
    private IMessageProducer _producer;
    private ISession _session;
    private bool _reConnectOnConnectionBreak = false;
    private bool _connected = false;
    private void CreateWebsphereQueueConnection () {
        SetConnectionFactory ();
    
        while (!_connected || _reConnectOnConnectionBreak) {
            try {
                //Connection
                _connection = _connectionfactory.CreateConnection (null, null);
                _connection.ExceptionListener = new ExceptionListener (OnConnectionException);
    
                //Session
                _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);
    
                //Destination
                _destination = _session.CreateQueue ("queue://My.Queue.Name");
                _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
                _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);
    
                //Consumer
                _consumer = _session.CreateConsumer (_destination);
                _connected = true;
            } catch (Exception ex) {
                _connected = false;
            }
    
        }
    }
    
    private IConnectionFactory SetConnectionFactory () {
        XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
        IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();
    
        // Set the properties
        cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
        cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
        cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
        cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
        cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
        cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
        cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);
    
        cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
        cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
        return cf;
    }
    
    public override void Subscribe<T> (Action<T> onMessageReceived) {
        try {
    
            _connection.ExceptionListener = delegate (Exception connectionException) {
                XMSException xmsError = (XMSException) connectionException;
                int reasonCode = ((IBM.WMQ.MQException) (xmsError).LinkedException).ReasonCode;
                if (reasonCode == MQC.MQRC_Q_MGR_QUIESCING || reasonCode == MQC.MQRC_CONNECTION_BROKEN) {
                    _reConnectOnConnectionBreak = true;
                    _connection.Close ();
    
                    CreateWebsphereQueueConnection ();
                    Subscribe (onMessageReceived);
                    _reConnectOnConnectionBreak = false;
                }
            }
    
            MessageListener messageListener = new MessageListener ((msg) => {
                onMessageReceived (message);
            });
            _consumer.MessageListener = messageListener;
    
            // Start the connection
            _connection.Start ();
        } catch (Exception ex) {
            //Log exception details
        }
    }
    

    没有更好的方法来检查 IBM MQ 版本 8 中连接 IConnection 的状态。所以,我不得不使用原因代码。在 IBM MQ 版本 9 中,我们可以使用服务器公开的其余 API 来检查连接状态。

    【讨论】:

    • 如果您的 MQ 团队有 MQ 的标准停止脚本,您也可以要求他们将 -r 放入标准停止脚本中,以便始终使用它。
    • 我认为他们这样做了,我再次通知了他们。即使有人错过了 -r 开关,此代码现在也能处理连接中断
    • 如果这能解决您的问题,您应该接受自己的答案。
    • 为什么你的代码不完整?类在哪里,您的异常回调方法在哪里,您放置的覆盖的虚拟方法在哪里?我不明白你的代码
    【解决方案2】:

    为了我的服务,我将 CreateWebsphereQueueConnection()Subscribe&lt;T&gt;() 组合成一个 Connect() 方法。

    与:

    connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, XMSC.WMQ_CLIENT_RECONNECT);                           
    connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, 3600);                          
    connectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT_UNMANAGED);
    
    var queueConnection = connectionFactory.CreateConnection();
    queueConnection.ExceptionListener = OnException;
    

    然后我处理这样的异常:

    private void OnException(Exception exception)
    {
        Policy.Handle<Exception>()
            .WaitAndRetryForever(retryAttempt => TimeSpan.FromSeconds(5), (ex, timespan) =>
            {
                _logger.Warning($"Unable to connect: {ex.Message}.");
            })
            .Execute(CreateWebsphereQueueConnection);
    }
    

    这对retry 很重要,因为您不知道需要多长时间才能重新连接。

    【讨论】:

    • 通过将XMSC.WMQ_CLIENT_RECONNECT_OPTIONS 设置为XMSC.WMQ_CLIENT_RECONNECT 并将XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT 设置为3600,您是在告诉.NET 应用程序尝试重新连接3600 秒(1 小时)。如果客户端由于队列管理器崩溃或某种通信错误而失去连接,或者它以-r-s 标志结束,则会发生这种情况。异常侦听器旨在让应用程序知道正在发生重新连接逻辑。根据您提供的内容,您的初始会话不会重新连接,您将开始一个新会话吗?
    • 在审查文档时,它提出了类似于queueConnection.ExceptionListener = new ExceptionListener(OnException); 的内容,类似于@PushCode 所呈现的内容。它继续声明“您可以通过将 ExceptionListener 重置为 null 来删除委托:connection.ExceptionListener = null;”。您是否有可能将= OnException 视为null,因此实际上并没有设置异常侦听器,而只是MQ 的重新连接逻辑让您保持连接?
    • 有趣点 - 最初我让异常处理程序只记录错误,并依靠 MQ 的重新连接逻辑来重新连接。但是,我注意到它经常没有重新连接。现在我在我的代码中强制重新连接它一直工作正常。看起来您对 IBM MQ 了解很多,所以如果我的回答不够好​​,请随时发表您自己的文章。 :)
    • 当 MQ 停机超过 1 小时时,它没有重新连接的时间在哪里?当它在不到 1 小时内重新连接时,您会看到两个不同的连接吗? (1 个来自 MQ 重新连接原始会话,1 个来自您的显式连接)
    【解决方案3】:

    您已经在连接工厂上设置了重新连接选项。当与队列管理器的连接中断时,XMS 库将自动重新连接,除非队列管理器在没有 -r 或 -s 选项的情况下关闭。因此,您的应用程序不需要显式重新连接。拥有异常侦听器将有助于了解重新连接过程的情况。

    【讨论】:

    • @JoshMc 是否会在队列管理器停止且没有选项指示可重新连接的客户端重新连接的情况下负责重新连接?
    • 不,如果没有指示可重新连接的客户端重新连接的选项,XMS 或任何其他 MQ 客户端将不会重新连接。该选项等效于 endmqm 命令的 -r 选项
    • 我想我在生产中看到了这种重启的问题,我正在尝试解决我的连接异常监听器。在我的连接异常处理程序中创建新连接之前,有什么方法可以检查连接和会话的状态?我在 IConnection 对象上找不到这样的属性,当我执行 IConnection.Stop() 或 IConnection.Close() 时,代码完全无法执行。
    • 让 MQ 管理员在他们使用的任何启动/停止脚本中默认使用 -r 标志并没有什么坏处,请让他们将其设为默认值。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-06
    • 2016-01-21
    • 1970-01-01
    • 1970-01-01
    • 2023-04-05
    • 2020-09-18
    相关资源
    最近更新 更多