【问题标题】:Reading from MSMQ slows down when there is a lot of messages queued当有大量消息排队时,从 MSMQ 读取速度会变慢
【发布时间】:2011-09-23 09:38:51
【问题描述】:

简介

我有一个基于 SEDA 的系统,并使用 MSMQ 在不同的应用程序/服务之间进行通信(事件触发)。

其中一个服务按文件获取消息,因此我有一个文件侦听器,它读取文件内容并将其插入队列(或实际上是 4 个不同的队列,但这对于第一个问题不是很重要)。

服务器是 Windows Server 2008

第一个问题 - 阅读速度变慢

我在另一端读取这些消息的应用程序通常每秒从队列中读取大约 20 条消息,但是当发布消息的服务开始排队数千条消息时,读取​​下降,读取应用程序只读取 2-每秒 4 条消息。当没有发布到队列时,读取应用程序每秒可以再次读取多达 20 条消息。

阅读应用程序的代码很简单,用C#开发,我使用System.Messaging中的Read(TimeSpan timeout)函数。

问:为什么当有大量消息发布到队列时读取速度变慢?

第二个问题 - TPS 的限制

另一个问题是关于阅读本身。如果我使用 1 或 5 个线程从队列中读取,我每秒可以读取多少条消息似乎没有区别。我还尝试实现“循环解决方案”,其中发布服务发布到一组随机的 4 个队列,并且读取应用程序有一个线程监听这些队列中的每一个,但即使我仍然只有 20 TPS从具有 1 个线程的 1 个队列、具有 4 个线程的 1 个队列或 4 个队列(每个队列一个线程)读取。

我知道线程中的处理大约需要 50 毫秒,所以如果当时只处理一条消息,则 20 TPS 是相当正确的,但多线程的线索应该是消息是并行处理的,而不是顺序处理的。

服务器上有大约 110 个不同的队列。

问:为什么即使使用多线程和使用多个队列,我一次也不能从队列中取出超过 20 条消息?

这是今天运行的代码:

// There are 4 BackgroundWorkers running this function
void bw_DoWork(object sender, DoWorkEventArgs e) 
{
    using(var mq = new MessageQueue(".\\content"))
    {
        mq.Formatter = new BinaryMessageFormatter();

        // ShouldIRun is a bool set to false by OnStop()
        while(ShouldIRun)
        {
            try
            {
                using(var msg = mq.Receive(new TimeSpan(0,0,2))
                {
                    ProcessMessageBody(msg.Body); // This takes 50 ms to complete
                }
            }
            catch(MessageQueueException mqe)
            {
               // This occurs every time TimeSpan in Receive() is reached
               if(mqe.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) 
                   continue;
            }
        }
    }

但是即使有4个线程,似乎都在等待函数再次进入“接收”点。我也尝试过使用 4 个不同的队列(content1、content2、content3 和 content4),但我仍然每 50 毫秒处理 1 条消息。

这与 Receive() 中的 TimeSpan 有什么关系,和/或是否可以省略它?

另一个问题是,如果使用私有队列,而不是公共队列可以解决什么问题?

【问题讨论】:

    标签: c# .net msmq message-queue


    【解决方案1】:

    性能问题。
    您没有提及是否所有代码都在服务器上运行,或者您是否有客户端远程访问服务器上的队列。从速度来看,我假设是后者。
    另外,队列是事务性的吗?
    消息有多大?

    如果您想从队列中读取消息,您的应用程序不会连接到队列本身。一切都在本地队列管理器和远程队列管理器之间进行。队列管理器是对队列进行写入和读取的唯一进程。因此,拥有多个队列或单个队列不一定会有任何不同的表现。

    因此,MSMQ 队列管理器在某些时候会成为瓶颈,因为它只能同时完成这么多的工作。您的第一个问题表明了这一点 - 当您在将消息放入的队列管理器上施加高负载时,您将消息取出的能力会减慢。例如,我建议查看性能监视器以查看 MQSVC.EXE 是否已被最大化。

    【讨论】:

    • 所有代码都在同一台服务器上运行。它仅用于在各个服务之间发送消息。它是公共队列,非事务性的。消息很小。
    • 您需要公共队列吗?除非您需要远程管理,否则专用队列通常就足够了。您是否使用 DIRECT FormatNames 来处理队列或路径名?
    • 其实我不需要公共队列,但是当系统创建时我不得不因为远程访问,但队列现在只被本地应用程序使用。我正在使用路径名来访问队列,但是队列的初始化只进行了一次,所以这不应该意味着太多吗?
    • 一个简单的测试是从服务器中嗅出网络,看看是否有任何额外的流量用于查询 Active Directory。
    • 顺便说一下,每秒 20 条消息确实非常慢。我的台式机将在 32 秒内收到 10,000 条 1kb 的可恢复消息。这没有对消息进行任何处理 - 只是从本地队列中接收它们。
    【解决方案2】:

    你为什么使用时间跨度? - 这是一件坏事,原因如下。

    在开发服务和队列时,您需要以安全的方式进行编程。队列中的每个项目都会产生一个新线程。使用时间跨度会强制每个线程使用单个计时器事件线程。这些事件必须在事件线程中等待轮到它们。

    标准是每个队列事件 1 个线程 - 这通常是您的 System.Messaging.ReceiveCompletedEventArgs 事件。另一个线程是您的 onStart 事件...

    每秒 20 个线程或 20 次读取可能是正确的。通常,在线程池中,您一次只能在 .net 中生成 36 个线程。

    我的建议是放弃计时器事件,让您的队列简单地处理数据。

    做更多这样的事情;

    namespace MessageService 
    { 
    
    public partial class MessageService : ServiceBase 
    
    { 
    
        public MessageService() 
    
        { 
    
            InitializeComponent(); 
    
        } 
    
    
    
        private string MessageDirectory = ConfigurationManager.AppSettings["MessageDirectory"]; 
    
        private string MessageQueue = ConfigurationManager.AppSettings["MessageQueue"]; 
    
    
    
        private System.Messaging.MessageQueue messageQueue = null; 
    
    
    
        private ManualResetEvent manualResetEvent = new ManualResetEvent(true); 
    
    
    
    
    
        protected override void OnStart(string[] args) 
    
        { 
    
            // Create directories if needed 
    
            if (!System.IO.Directory.Exists(MessageDirectory)) 
    
                System.IO.Directory.CreateDirectory(MessageDirectory); 
    
    
    
            // Create new message queue instance 
    
            messageQueue = new System.Messaging.MessageQueue(MessageQueue); 
    
    
    
            try 
    
            {    
    
                // Set formatter to allow ASCII text 
    
                messageQueue.Formatter = new System.Messaging.ActiveXMessageFormatter(); 
    
                // Assign event handler when message is received 
    
                messageQueue.ReceiveCompleted += 
    
                    new System.Messaging.ReceiveCompletedEventHandler(messageQueue_ReceiveCompleted); 
    
                // Start listening 
    
    
    
                messageQueue.BeginReceive(); 
    
            } 
    
            catch (Exception e) 
    
            { 
    
    
    
            } 
    
        } 
    
    
    
        protected override void OnStop() 
    
        { 
    
            //Make process synchronous before closing the queue 
    
            manualResetEvent.WaitOne(); 
    
    
    
    
    
            // Clean up 
    
            if (this.messageQueue != null) 
    
            { 
    
                this.messageQueue.Close(); 
    
                this.messageQueue = null; 
    
            } 
    
        } 
    
    
    
        public void messageQueue_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e) 
    
        { 
    
            manualResetEvent.Reset(); 
    
            System.Messaging.Message completeMessage = null; 
    
            System.IO.FileStream fileStream = null; 
    
            System.IO.StreamWriter streamWriter = null; 
    
            string fileName = null; 
    
            byte[] bytes = new byte[2500000]; 
    
            string xmlstr = string.Empty;                
    
                try 
    
                { 
    
                    // Receive the message 
    
                    completeMessage = this.messageQueue.EndReceive(e.AsyncResult);                    
    
                    completeMessage.BodyStream.Read(bytes, 0, bytes.Length); 
    
    
    
                    System.Text.ASCIIEncoding ascii = new System.Text.ASCIIEncoding(); 
    
    
    
                    long len = completeMessage.BodyStream.Length; 
    
                    int intlen = Convert.ToInt32(len);                   
    
                    xmlstr = ascii.GetString(bytes, 0, intlen);                   
    
                } 
    
                catch (Exception ex0) 
    
                { 
    
                    //Error converting message to string                    
    
                } 
    
            }
    

    【讨论】:

    • 您是否可以参考一些文档以获取声明 - “您为什么使用时间跨度? - 这是一件坏事,这就是为什么。”快速寻找一些 doco,但找不到任何东西。
    猜你喜欢
    • 2010-12-07
    • 2011-05-19
    • 1970-01-01
    • 2013-01-27
    • 1970-01-01
    • 2012-11-14
    • 2013-12-31
    • 2018-12-03
    • 1970-01-01
    相关资源
    最近更新 更多