【问题标题】:MSMQ and polling to receive messages?MSMQ和轮询接收消息?
【发布时间】:2010-11-14 22:52:16
【问题描述】:

我有一个 Windows 服务 可以进行一些图像转换。它通过在任何文件(在特定文件夹中)被重命名(即重命名文件观察程序)时触发来工作。效果很好,直到我在该文件夹中转储(并重命名)大量图像。 CPU红线等。

所以,我打算更改我的代码以使用 MSMQ 来排队所有需要转换的文件。美好的。每次重命名文件并触发文件观察程序时,我都会向队列中添加一条新消息。邱尔。

问题是这样的 -> 我如何一次从队列中获取一条消息?

我是否需要创建一个每 xxx 秒轮询一次队列的计时器对象?或者有没有办法不断地偷看队列中的第一个项目。一旦消息存在,提取它,处理它,然后继续(这意味着,继续偷看直到世界爆炸)。

我想知道我是否只需要在 Receive 方法周围放置一个 while 循环。伪代码如下(在编辑#2中)...

谁有这方面的经验并有一些建议?

非常感谢!

编辑:

如果 WCF 是要走的路,有人可以提供一些示例代码等吗?

编辑 2:

这是我正在考虑的一些伪代码......

// Windows service start method.
protected override void OnStart(string[] args)
{
   // some initialisation stuf...

   // Start polling the queue.
   StartPollingMSMQ();

   // ....
}

private static void StartPollingMSMQ()
{
    // NOTE: This code should check if the queue exists, instead of just assuming it does.
    //       Left out for berevity.
    MessageQueue messageQueue = new MessageQueue(".\\Foo");

    while (true)
    {
        // This blocks/hangs here until a message is received.
        Message message = messageQueue.Receive(new TimeSpan(0, 0, 1));

        // Woot! we have something.. now process it...
        DoStuffWithMessage(message);

        // Now repeat for eva and eva and boomski...
    }
}

【问题讨论】:

    标签: .net msmq


    【解决方案1】:

    听起来你需要研究 WCF。

    Queues in Windows Communication Foundation

    负载均衡。发送申请 可以压倒接收应用程序 与消息。队列可以管理 不匹配的消息产生和 消费率,使接收器 不会不知所措。

    这是一个使用WCF and MSMQ的示例

    【讨论】:

    • WCF 在幕后使用 MSMQ ......所以我不确定 WCF 如何帮助我。
    • 请查看我添加到答案中的链接中的示例。
    【解决方案2】:

    我的印象是 MSMQ 是为与 IBM 的 MQ 产品兼容而构建的。如果,您可以在超时的情况下调用 MQGET,而完全不用担心轮询。

    只需在两秒超时后从队列中取出一条消息(例如)。如果那里有,请处理它。然后根据需要退出服务或等待返回 MQGET。

    这意味着您的服务不会不必要地占用 CPU 时间,但如果收到信号,它仍然能够及时退出。

    一般情况下,你会有类似的东西:

    Set up all queue stuff.
    while true:
        Read from queue with 10-second timeout.
        If message was read:
            Process message
        If signaled to exit:
            break
    Tear down queue stuff.
    Exit.
    

    【讨论】:

    • 嗨,Pax,我刚刚在您回答这个问题时更新了我的初始帖子。你能重新阅读一下,看看有什么改变吗?
    • 不,不是。这是一个很好的方法,假设时间跨度是一分钟(即,不是太小)。您希望它足够大,以至于您的循环不会连续运行,但足够小,您可以对非 MSMQ 的东西做出反应。如果您的服务被告知关闭,那么部分“内容”将退出该 while-true 循环。
    • 并且您只想在返回消息时调用 DoStuffWithMessage,而不是在没有消息的情况下发生超时。
    • 但是由于超时,这不会继续阅读它。 ..对吗?
    • @PureKrome:不,如果您阅读了一条消息,它就会从队列中取出并由您处理。然后,当您返回队列时,您将收到 next 消息。你说的是偷看。
    【解决方案3】:

    如果使用本地队列,则不需要 WCF。

    这是我的示例服务(来自 Windows 服务项目的服务类)的外观:

    using System.Messaging;
    public partial class MQProcessTest1 : ServiceBase
    {
        //a name of the queue
        private const string MqName = @".\Private$\test1";
        //define static local private queue
        private static MessageQueue _mq;
        //lazy local property through which we access queue (it initializes queue when needed)
        private static MessageQueue mq
        {
            get
            {
                if (_mq == null)
                {
                    if (!MessageQueue.Exists(MqName))
                        MessageQueue.Create(MqName);
                    _mq = new MessageQueue(MqName, QueueAccessMode.ReceiveAndAdmin);
                    _mq.Formatter = new BinaryMessageFormatter();
                }
                return _mq;
            }
        }
    
        //constructor
        public MQProcessTest1()
        {
            InitializeComponent();
            //event to process received message 
            mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
        }
    
        //method to process message
        private void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
        {
            //queue that have received a message
            MessageQueue cmq = (MessageQueue)sender;
            try
            {
                //a message we have received (it is already removed from queue)
                Message msg = cmq.EndReceive(e.AsyncResult);
                //here you can process a message
            }
            catch
            {
            }
            //refresh queue just in case any changes occurred (optional)
            cmq.Refresh();
            //tell MessageQueue to receive next message when it arrives
            cmq.BeginReceive();
        }
    
        protected override void OnStart(string[] args)
        {
            //let start receive a message (when arrives)
            if (mq != null)
                mq.BeginReceive();
            //you can do any additional logic if mq == null
        }
    
        protected override void OnStop()
        {
            //close MessageQueue on service stop
            if (mq != null)
                mq.Close();
            return;
        }
    }
    

    【讨论】:

    • 这是一个很好的解决方案,但我认为它不适用于事务队列(因为没有选项可以在 BeginReceiveEndReceive 中指定事务)
    • 几年过去了,但是... MessageQueue.Create(MqName) - 避免它 - 您可能会遇到从其他应用程序访问队列的问题,并手动管理它。以其他方式创建队列。 PowerShell 脚本,手动通过 Windows UI 等,具有特定权限等。
    【解决方案4】:

    请注意,服务示例将在 OnStart() 处阻塞。而是启动一个工作线程:

        protected override void OnStart(string[] args)
        {
            IntPtr handle = this.ServiceHandle;
            myServiceStatus.currentState = (int)State.SERVICE_START_PENDING;
            SetServiceStatus(handle, ref myServiceStatus);
    
            // Start a separate thread that does the actual work.
    
            if ((workerThread == null) ||
                ((workerThread.ThreadState &
                 (System.Threading.ThreadState.Unstarted | System.Threading.ThreadState.Stopped)) != 0))
            {
                workerThread = new Thread(new ThreadStart(ServiceWorkerMethod));
                workerThread.Start();
            }
    
        }
    

    然后从工作人员那里调用 BeginReceive()。

    【讨论】:

    • 呃 - 我不明白。所以,这是一个 Windows 服务。查看。它有一个称为 OnStart(..) 的覆盖方法。查看。我覆盖它。查看。但是现在,你建议某些东西被阻塞了,所以我需要在一个新的线程中做一些新的东西。
    • 我认为他想说的就是here所说的。
    【解决方案5】:

    在 ViktorJ 示例中将事件参数转换为获取消息而不是发送者到新的 MessageQueue,这肯定更有效吗?然后使用静态字段 mq 调用 mq.BeginReceive 否则你会燃烧内存

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-08-13
      • 1970-01-01
      • 2010-10-30
      • 2010-12-04
      • 1970-01-01
      • 2010-12-27
      • 2010-10-28
      相关资源
      最近更新 更多