【问题标题】:Service not receiving messages after Message Queuing service restarted消息队列服务重新启动后服务未收到消息
【发布时间】:2012-05-07 09:00:21
【问题描述】:

我们有一个从 n 个消息队列接收消息的服务。但是,如果重新启动消息队列服务,即使消息队列服务重新启动成功,消息检索服务也会停止接收消息。

我试图专门捕获消息检索服务中抛出的 MessageQueueException 并再次调用队列的 BeginReceive 方法。但是,在消息队列服务重新启动的大约 2 秒内,我得到了大约 1875 个异常实例,然后当我们的 StartListening 方法中抛出另一个 MessageQueueException 时,该服务停止运行。

有没有一种优雅的方法可以从消息队列服务重启中恢复?

    private void OnReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        MessageQueue queue = (MessageQueue)sender;

        try
        {
            Message message = queue.EndReceive(e.AsyncResult);

            this.StartListening(queue);

            if (this.MessageReceived != null)
                this.MessageReceived(this, new MessageReceivedEventArgs(message));
        }
        catch (MessageQueueException)
        {
            LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, StringResource.LogMessage_QueueManager_MessageQueueException, queue.MachineName, queue.QueueName, queue.Path));
            this.StartListening(queue);
        }            
    }

    public void StartListening(MessageQueue queue)
    {
        queue.BeginReceive();
    }

我需要处理由此导致的无限循环问题并稍微清理一下,但你明白了。

当 MessageQueueException 发生时,调用 RecoverQueue 方法。

    private void RecoverQueue(MessageQueue queue)
    {            
        string queuePath      = queue.Path;
        bool   queueRecovered = false;

        while (!queueRecovered)
        {
            try
            {
                this.StopListening(queue);
                queue.Close();
                queue.Dispose();

                Thread.Sleep(2000);

                MessageQueue newQueue = this.CreateQueue(queuePath);

                newQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(this.OnReceiveCompleted);

                this.StartListening(newQueue);

                LogUtility.LogInformation(String.Format(CultureInfo.InvariantCulture, "Message queue {0} recovered successfully.", newQueue.QueueName));

                queueRecovered = true;
            }
            catch (Exception ex)
            {
                LogUtility.LogError(String.Format(CultureInfo.InvariantCulture, "The following error occurred while trying to recover queue: {0} error: {1}", queue.QueueName, ex.Message));                
            }
        }           
    }

    public void StopListening(MessageQueue queue)
    {
        queue.ReceiveCompleted -= new ReceiveCompletedEventHandler(this.OnReceiveCompleted);            
    }

【问题讨论】:

  • 贴出RecoverQueue方法代码

标签: c# msmq


【解决方案1】:

在收到服务重启导致的异常后,你必须释放旧的MessageQueue,即取消ReceiveCompleted事件,处理MessageQueue等。然后创建一个新的@实例987654324@ 并在新的 MessageQueue 实例上再次连接到 ReceiveCompleted 事件。

或者,您可以使用轮询方法在特定时间间隔创建一个新实例,调用MessageQueue.Receive(TimeSpan),将等待传入消息或直到发生超时。在这种情况下,您处理消息并销毁 MessageQueue 实例并再次开始迭代。

通过每次重新创建MessageQueue,您可以确保内置恢复。此外,由于底层队列的内部缓存,创建MessageQueue 的开销最小。

伪代码...

while (!notDone)// or use a timer or periodic task of some sort...
{
    try
    {
        using (MessageQueue queue = new MessageQueue(queuePath))
        {
            Message message = queue.Receive(TimeSpan.FromMilliseconds(500));

            // process message
        }
    }
    catch (MessageQueueException ex)
    {
        // handle exceptions
    }
}

【讨论】:

  • 好的,所以你基本上只是在说:“每次都使用一个新队列。”
  • @Bob Horn - 是的。由于内部缓存导致开销较低,因此可以更轻松地处理 MSMQ 服务重新启动或无响应的问题。
  • @Jim- 我在服务首次启动时声明队列,然后为 OnReceived 设置处理程序。我遇到过同样的问题。处理程序被触发,在执行 EndReceive 并读取消息时遇到 QueueException,但这会阻止服务接收未来的消息。当我不使用您描述的计时器时,如何处理此异常?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-05-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-09-22
  • 1970-01-01
相关资源
最近更新 更多