【问题标题】:Proper use of Message Queue on 2008R2在 2008R2 上正确使用消息队列
【发布时间】:2011-07-21 16:39:07
【问题描述】:

我不是程序员,但我想通过给他们一些指导来帮助他们。我们不再有任何关于 msmq 的内部专业知识。我们正在尝试使用它来将一些功能与调度应用程序集成。

调度应用程序通过使用自定义构建的 dll 进行网络调用来启动作业。 dll 调用 weburl。 Web 应用程序将运行其任务并将有关其执行任务的更新发送到网站。网站将消息写入队列。调用该站点的 dll 正在监视队列中带有分配给该作业的标签的消息。当它收到最终状态消息时,它会关闭。

我们每隔几个小时就会收到以下消息。我们每小时运行近 100 个使用这种方法的作业。在底部列出的代码中,jobid 对应于消息队列中消息的标签。每个作业在开始时都会发出一个作业 ID,并将其用作发送到该作业的 msmq 的每条消息的标签。

 System.Messaging.MessageQueueException (0x80004005): Message that the cursor is currently pointing to has been removed from the queue by another process or by another call to Receive without the use of this cursor.
  at System.Messaging.MessageQueue.ReceiveCurrent(TimeSpan timeout, Int32 action, CursorHandle cursor, MessagePropertyFilter filter, MessageQueueTransaction internalTransaction, MessageQueueTransactionType transactionType)
  at System.Messaging.MessageEnumerator.get_Current() 

这是它的代码。

  while ( running )
        {
            // System.Console.WriteLine( "Begin Peek" );
            messageQueue.Peek();
            //System.Console.WriteLine( "End Peek" );
            messageQueue.MessageReadPropertyFilter.SetAll();

            using ( MessageEnumerator enumerator = messageQueue.GetMessageEnumerator2() )
            {
                enumerator.Reset();

                while ( enumerator.MoveNext() )
                {
                    Message msg = enumerator.Current;

                    if ( msg.Label.Equals( this.jobid ) )
                    {
                        StringBuilder sb = new StringBuilder();
                        /*
                        try
                        {
                            sb.Append( "Message Source: " );
                            //sb.Append( msg.SourceMachine );
                            sb.Append( " Sent: " );
                            sb.Append( msg.SentTime );
                            sb.Append( " Label " );
                            sb.Append( msg.Label );
                            sb.Append( " ID: " );
                            sb.Append( msg.Id );
                            sb.Append( " CorrelationID: " );
                            sb.Append( msg.CorrelationId );
                            sb.Append( " Body Type: " );
                            sb.Append( msg.BodyType );
                        }
                        catch ( Exception )
                        {
                            throw;
                        }
                        finally
                        {
                            System.Console.WriteLine( sb.ToString() );
                        }
                        */
                        //System.Console.WriteLine( "Receiving Message started" );
                        using ( Message message = messageQueue.ReceiveById( msg.Id ) )
                        {
                            //System.Console.WriteLine( "Receiving Message Complete" );
                            //sb = new StringBuilder();
                            string bodyText = string.Empty;

                            try
                            {
                                System.IO.StringWriter sw = new System.IO.StringWriter( sb );
                                System.IO.StreamReader sr = new System.IO.StreamReader( message.BodyStream );

                                while ( !sr.EndOfStream )
                                {
                                    sw.WriteLine( sr.ReadLine() );
                                }
                                sr.Close();
                                sw.Close();
                                bodyText = ( string ) FromXml( sb.ToString(), typeof( string ) );
                                int indx = bodyText.IndexOf( ',' );
                                string tokens = bodyText.Substring( indx + 1 );
                                indx = tokens.IndexOf( ',' );
                                string command = tokens.Substring( 0, indx );
                                tokens = tokens.Substring( indx + 1 );
                                if ( command.Equals( COMMAND_STARTED ) )
                                {
                                    System.Console.WriteLine( "STARTED " + tokens );
                                }
                                else if ( command.Equals( COMMAND_UPDATE ) )
                                {
                                    System.Console.WriteLine( tokens );
                                }
                                else if ( command.Equals( COMMAND_ENDED_OK ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Success" );
                                    finalResults = new FinalResults( 0, 0, "Success" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_WARNING ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Warning Issued" );
                                    finalResults = new FinalResults( 1, 1, "Warning" );
                                    running = false;
                                }
                                else if ( command.Equals( COMMAND_ENDED_FAIL ) )
                                {
                                    System.Console.WriteLine( tokens );
                                    System.Console.WriteLine( "WEBJOB: Failure" );
                                    finalResults = new FinalResults( 2, 16, "Failure" );
                                    running = false;
                                }
                            }
                            catch ( Exception )
                            {
                                throw;
                            }
                            finally
                            {
                                //System.Console.WriteLine( "Body: " + bodyText );
                            }
                        }
                    }
                }
            }
        }

        return finalResults;
    }

    MessageQueue messageQueue = null;
    string webServiceURL = "";
    Dictionary<string, string> parms = new Dictionary<string, string>();
    string jobid = "NONE";

【问题讨论】:

    标签: c# msmq message-queue windows-server-2008-r2


    【解决方案1】:

    kprobst 的解释很可能是正在发生的事情。即使您看到此特定消息在队列中,如果不同的应用程序(或同一应用程序的不同实例)从该队列中选择一条(任何)消息,这将使光标无效。

    从本质上讲,如果多个进程从同一个队列馈送,则此代码无法正常工作。

    【讨论】:

    • 顺便说一句。如果您使用的是 2008 R2,则可以通过使用子队列在不进行太多修改的情况下使其工作。 technet.microsoft.com/en-us/library/cc730897(WS.10).aspx。在您的情况下,每个作业都是一个子队列。
    • 子队列看起来很有希望。目前正在查看一些示例,以了解它是如何工作的以及它是如何创建的。
    • 忘记标记为答案。我们确实实现了子队列,这已被证明是可靠的,感谢您的帮助。
    • @Naraen 您是否有任何证据证明(对我来说相当令人惊讶)声称队列中的if a different app picks ANY message 会使该队列上的所有游标无效?我的经验是不同的,尽管在我的例子中,我们说的是多个游标,它们在单个应用程序中同时从同一个队列中删除项目。
    【解决方案2】:

    这通常意味着在接收操作完成之前,您正在接收()的消息正在被其他东西删除。另一个应用程序,或与您的代码在同一进程中的另一个线程使用不同的队列引用。

    您是否有可能同时运行两个处理器代码实例(我猜它是一个控制台应用程序)?在相同或不同的机器上?或者其他一些从队列中删除消息的应用程序或工具?

    .NET 2.0 的一个预发布版本中曾经存在一个错误,该错误会在某些压力条件下导致此问题,但据我所知,它在发布之前已修复。

    【讨论】:

    • 消息实际上还在队列中。作业失败并出现提到的错误消息。我可以查看队列,仍然可以找到属于该作业的消息标签。就好像它被锁定了一样。如果我理解代码(仍然声称不是开发人员),则创建的每个作业还将创建一个侦听器到队列,以侦听带有其作业 ID/标签的消息。它生成队列的动态列表,遍历它并处理带有其标签的任何消息。然后它会在收到结束状态消息时关闭。
    • 我明白了。那么您可能会遇到 CLR 中的错​​误。我可以告诉你的一件事是,这是使用 MSMQ 的一种相当不标准的方式。通常异步接收更有效,这就是我使用的。我从来没有遇到过这个问题,即使在大容量的情况下也是如此。你有几个选择。一,完全重写这个。第二,修改队列使其具有事务性,并更改代码来处理它。这可能会摆脱错误。第三,向 Microsoft 报告,看看他们是否可以通过 KB 补丁或类似的方式帮助您。
    • 感谢您的回复。计划将重写这个。这必须等到下周接管代码的开发人员回来,我们才能同步。与此同时,我们对错误消息进行了尝试捕获创可贴,使其再次检查,因为该消息实际上并没有消失。这只是为了让我们熬到下周。
    【解决方案3】:

    由于 MessageQueue 的内部方法 ReceiveCurrent 中的并发问题,此操作失败。 异常堆栈跟踪显示调用源自 enumerator.Current 行,异常发生在 ReceiveCurrent。 Enumerator.Current 使用“peek”选项调用 ReceiveCurrent。你可以问,当我遇到同样的问题时,我也遇到过,怎么会因为“消息已收到”错误而导致偷看失败?它只是试图偷看下一条尚未收到的消息? 很好的答案在于 ReceiveCurrent 代码,可在此处查看: https://referencesource.microsoft.com/#System.Messaging/System/Messaging/MessageQueue.cs,02c33cc512659fd7,references

    ReceiveCurrent 首先进行 StaleSafeReceive 调用以查看下一条消息。但是如果这个调用返回它需要更多的内存来接收整个消息(带有 "while (MessageQueue.IsMemoryError(status)" in its source code),它分配所需的内存并进行另一个 StaleSafeReceive 调用以获取消息。 这是非常经典的 Win32 API 使用模式,因为它最终是基于 C 的。

    这里的问题是,如果在第一次和第二次调用 ReceiveCurrent 中的 StaleSafeReceive 之间,另一个进程或线程“接收”,即从队列中删除该消息,第二次调用会抛出这个确切的异常。这就是“窥视”操作失败的原因。 请注意,它可能是由枚举器扫描导致异常的任何消息,而不是正在查找的消息。这就解释了为什么在抛出异常并且方法失败后,具有该作业 ID 的消息仍然存在于队列中。

    可以做的是保护 enumerator.Current 调用与 try catch,如果这个特定的异常被捕获,只需继续枚举队列中的下一个可用消息。

    我使用了 Cursor 对象而不是枚举器,但它遇到了同样的问题。但是使用 Cursor 有另一种方法可以降低发生这种情况的风险,即在扫描/查看消息时关闭当前 Queue 对象的 MessagePropertyFilter 的所有不需要的属性,尤其是 Body 属性。因为在偷看期间通常不需要接收正文,但大多数情况下,消息正文会导致内存被重新分配,并且需要在 ReceiveCurrent 中进行第二次 StaleSafeReceive 调用。 在 peek 调用中直接使用 Cursor 仍需要尝试捕获此异常。

    【讨论】:

      猜你喜欢
      • 2018-10-12
      • 2020-11-27
      • 1970-01-01
      • 1970-01-01
      • 2020-01-01
      • 2014-02-13
      • 1970-01-01
      • 2021-04-14
      • 2019-02-14
      相关资源
      最近更新 更多