【问题标题】:Azure Service Bus - Receive Messages with OnMessage() MethodAzure 服务总线 - 使用 OnMessage() 方法接收消息
【发布时间】:2017-03-28 13:46:52
【问题描述】:

MS documentation 之后,从订阅接收消息并不困难。但是,如果我希望我的应用程序在每次发布新消息时都收到一条消息 - 持续轮询。因此 SubscriptionClient 类的 OnMessage() 方法。

MS documentation 说:...当调用 OnMessage 时,客户端会启动一个内部消息泵,不断轮询队列或订阅。这个消息泵由一个无限循环组成,发出 Receive() 调用。如果调用超时,它会发出下一个 Receive() 调用。 ..."

但是当应用程序运行时,在调用 OnMessage() 方法的那一刻,只接收到最新消息。当发布新消息时,持续轮询似乎不起作用。在尝试了许多不同的方法之后我可以使这项工作并让应用程序在收到新消息的那一刻做出反应的唯一方法是将代码放入一个具有无限循环的单独任务中。这在很多层面上似乎完全错误!(见下面的代码)。

谁能帮助我更正我的代码或发布一个工作示例以在没有循环的情况下完成相同的功能?谢谢!

 public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
        {
            var newMessage = new MessageQueue();
            int i = 0;

            Task listener = Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

                    Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();

                    OnMessageOptions options = new OnMessageOptions();
                                     options.AutoComplete = false;
                                     options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

                    Client.OnMessage((message) =>
                    {
                        try
                        {
                            retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
                            retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
                            retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
                            retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
                            retrievedMessage.Add("message", message.Properties["Message"].ToString());

                            newMessage.AnnounceNewMessage(retrievedMessage); // event ->

                            message.Complete(); // Remove message from subscription.
                        }
                        catch (Exception ex)
                        {
                            string exmes = ex.Message;
                            message.Abandon();
                        }

                    }, options);

                    retrievedMessage.Clear();

                    i++;

                    Thread.Sleep(3000);
                }

            });
        }

【问题讨论】:

    标签: c# azureservicebus


    【解决方案1】:

    您的代码有一些问题需要解决 -

    • 它失败了,我假设您的应用程序随后退出 - 或在 至少正在侦听消息的线程终止。
    • 您的 while 循环不断重复代码以连接消息处理程序, 您只需执行一次。
    • 您需要一种方法来保持调用堆栈处于活动状态并防止您的应用程序对您的对象进行垃圾收集。

    以下内容应该会帮助您走向成功。祝你好运。

     ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
        SubscriptionClient Client;
    
        public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
        {
            Task listener = Task.Factory.StartNew(() =>
            {
                // You only need to set up the below once. 
                Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);
    
                OnMessageOptions options = new OnMessageOptions();
                options.AutoComplete = false;
                options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
                options.ExceptionReceived += LogErrors;
    
                Client.OnMessage((message) =>
                {
                    try
                    {
                        Trace.WriteLine("Got the message with ID {0}", message.MessageId);
                        message.Complete(); // Remove message from subscription.
                    }
                    catch (Exception ex)
                    {
                        Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
                        message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
                    }
    
                }, options);
    
                CompletedResetEvent.WaitOne();
            });
        }
    
        /// <summary>
        /// Added in rudimentary exception handling .
        /// </summary>
        /// <param name="sender">The sender.</param>
        /// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
        private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
        {
            Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
        }
    
        /// <summary>
        /// Call this to stop the messages arriving from subscription.
        /// </summary>
        public void StopMessagesFromSubscription()
        {
            Client.Close(); // Close the message pump down gracefully
            CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully 
        }
    

    或者,您可以自己使用 ReceiveBatch 以更传统的方式将消息分块:

    var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
                                                           cancellationToken);
    

    【讨论】:

    • 使用清理版本编辑。
    • 是的,感谢您的快速响应。两个版本都运行良好。这个更干净。
    • 顺便说一句...真正让我感到惊讶的是,我必须通过一些解决方法(while 循环或 CompletedResetEvent 无休止的调用)人为地保持 Tread() 活着。我希望 MS 将如此重要的实现部分包含在他们的文档中,或者是否有此解决方案的 MS 示例?
    • 是的,确实如此。但是,Microsoft 必须让您对生命周期进行一些控制,这就是他们选择的方式。如果他们有更好或更容易找到的样本和文档来解决这个困惑。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多