【问题标题】:Clearing azure service bus queue in one go一次性清除 Azure 服务总线队列
【发布时间】:2012-04-12 19:14:15
【问题描述】:

我们在项目中使用服务总线队列。当管理员选择清除队列时,我们需要一个从队列中删除所有消息的功能。我在网上搜索,但在 QueueClient 类中找不到任何功能。

我是否必须将所有消息一一弹出,然后将它们标记为完成以清除队列,还是有更好的方法?

QueueClient queueClient = _messagingFactory.CreateQueueClient(
                              queueName, ReceiveMode.PeekLock);

BrokeredMessage brokeredMessage = queueClient.Receive();

while (brokeredMessage != null )
{
    brokeredMessage.Complete();
    brokeredMessage = queueClient.Receive();
}

【问题讨论】:

  • 我最近遇到了同样的问题,删除和重新创建不是一个选项,循环工作,但速度很慢,但是,使用 QueueClientPrefetchCount 成员,您可以轻松地批量处理一些单次往返发送数千条消息,极大地加快了处理速度:msdn.microsoft.com/en-us/library/…

标签: .net azure queue azureservicebus azure-servicebus-queues


【解决方案1】:

您可以简单地从 azure 门户中执行此操作。

如果您不担心过去 14 天(默认)存储的队列消息并想立即清除队列,则将消息时间更改为 5 或 10 秒并等待 5-10 秒。

重要步骤:几秒钟后,尝试从队列中接收以强制更新。

所有队列消息将被清除。您可以再次重置原始值。默认为 14 天。

【讨论】:

    【解决方案2】:

    当我读到这个问题时,我首先想到的是为什么不删除并重新创建队列?这可能是一种方法。但如果不想这样做,您可以接收并删除每条消息,直到没有消息为止。

    您可以将最新的 Azure 服务总线 .NET 库 Azure.Messaging.ServiceBus 用于这两种方法。

    1. 使用AdministrationClient 删除并重新创建队列
    using Azure.Messaging.ServiceBus.Administration;
    
    ServiceBusAdministrationClient adminClient = new ServiceBusAdministrationClient("<connectionstring>");
    await adminClient.DeleteQueueAsync("myqueue");          
    await adminClient.CreateQueueAsync("myqueue");          
    

    注意:ServiceBusAdministrationClient 用于对已现有服务总线命名空间的 CRUD 操作。如果您还需要创建命名空间的能力,请改用Microsoft.Azure.Management.ServiceBus 库。

    1. 接收并删除直到没有消息
    using Azure.Messaging.ServiceBus;
    
    await using var client = new ServiceBusClient("<connectionstring>");
                    
    ServiceBusReceiver receiver = client.CreateReceiver("myqueue", 
        new ServiceBusReceiverOptions { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });
    
    while ((await receiver.PeekMessageAsync()) != null)
    {
        // receive in batches of 100 messages.
        await receiver.ReceiveMessagesAsync(100);
    }
    

    ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete 是不言自明的。默认接收模式是PeekLock,它实际上并没有从队列中删除消息——它告诉服务总线接收客户端想要明确地settle接收到的消息。锁定部分意味着竞争接收者在锁定期间无法访问消息。

    【讨论】:

      【解决方案3】:

      结合使用ReceiveAndDeletePrefetchCountReceiveBatchAsync 和一个简单的真值循环而不是使用 Peek,我得到了很好的结果。下面是 MessagingFactory 的示例:

      var receiverFactory = MessagingFactory.CreateFromConnectionString("ConnString");
      var receiver = receiverFactory.CreateMessageReceiver("QName", ReceiveMode.ReceiveAndDelete);
      receiver.PrefetchCount = 300;
      
      bool loop = true;
      while (loop)
      {
          var messages = await receiver.ReceiveBatchAsync(300, TimeSpan.FromSeconds(1));
          loop = messages.Any();
      }
      

      只需要WindowsAzure.ServiceBus Nuget 包。

      【讨论】:

        【解决方案4】:

        如果您使用的是 nuget 中的 WindowsAzure.Storage 库,则有一个简单的 Clear() 方法可以清除整个队列。我使用该库中的Microsoft.Windows.Azure.Queue 类来管理队列。否则,您可以通过他们的 API per their documentation 访问。我不知道该方法在 Azure 库中存在多长时间,并且在最初提出问题时它可能不存在,但 REST API 至少可以追溯到 2014 年this Azure feedback post

        使用 Azure 库清除队列的完整 .NET 代码是:

        string connectionString = "YourAzureConnectionStringHere";
        string queueName = "YourWebJobQueueName";
        CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
        
        // Create the queue client, then get a reference to queue
        CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
        queue = queueClient.GetQueueReference(GetQueueName(queueName));
        
        // Clear the entire queue
        queue.Clear();
        

        【讨论】:

        • 最初的问题是关于清除 Azure 服务总线队列,而不是普通的 Azure 队列。
        • 根据微软的说法,Azure Service Bus Queue和Azure Queue都支持.clear函数:feedback.azure.com/forums/217298-storage/suggestions/…
        • 不在客户端的 .NET Core 版本中(即 Microsoft.Azure.ServiceBus)。这不包含 Clear() 方法。
        【解决方案5】:

        清理 Azure ServiceBus 队列的最快方法是设置一个非常短的DefaultMessageTimeToLive,等待几秒钟,尝试从队列接收强制更新,然后恢复初始的DefaultMessageTimeToLive

        您可以通过门户或代码来完成:

        var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
        var queueDescription = _namespaceManager.GetQueue(queueName);
        var queueClient = QueueClient.CreateFromConnectionString(connectionString, queueName, ReceiveMode.ReceiveAndDelete);
        
        var dl = queueDescription.EnableDeadLetteringOnMessageExpiration;
        var ttl = queueDescription.DefaultMessageTimeToLive;
        
        queueDescription.EnableDeadLetteringOnMessageExpiration = false;
        queueDescription.DefaultMessageTimeToLive = TimeSpan.FromSeconds(1);
        
        Thread.Sleep(5000);
        var dumy = queueClient.ReceiveBatch(200, TimeSpan.FromSeconds(1)).ToArray();
        
        queueDescription.EnableDeadLetteringOnMessageExpiration = dl;
        queueDescription.DefaultMessageTimeToLive = ttl;
        

        【讨论】:

        • 嗨弗洛伦特,不确定 api 是否改变了,但我刚刚在我的队列上尝试过,它的改变为零。前后队列中的相同消息。
        【解决方案6】:

        使用:

        • 两种方法(来自@ScottBrady 和@participant)
        • 还有MessageReceiver 抽象

        您可以编写清空服务总线队列或主题/订阅的方法:

        MessageReceiver messageReceiver = ...
        while (messageReceiver.Peek() != null)
        {
            // Batch the receive operation
            var brokeredMessages = messageReceiver.ReceiveBatch(300);
        
            // Complete the messages
            var completeTasks = brokeredMessages.Select(m => Task.Run(() => m.Complete())).ToArray();
        
            // Wait for the tasks to complete. 
            Task.WaitAll(completeTasks);
        }
        

        【讨论】:

          【解决方案7】:

          对于 Azure-ServiceBus-Queues,有一个 ReceiveBatch-method,它允许您同时接收一批 n-消息。结合ReceiveMode.ReceiveAndDelete可以更高效的清队列。

          警告可能会返回 n 条消息,但不能保证。 256K 的消息批量大小也有限制。

          【讨论】:

            【解决方案8】:

            像您一样在 while 循环中使用 Receive() 方法将导致您的代码在队列为空后无限期运行,因为 Receive() 方法将等待另一条消息出现在队列中。

            如果您希望它自动运行,请尝试使用Peek() 方法。

            例如:

            while (queueClient.Peek() != null)
            {
                var brokeredMessage = queueClient.Receive();
                brokeredMessage.Complete();
            }
            

            您可以使用 hocho 提到的 ReceiveMode.ReceiveAndDelete 再次简化此操作。

            【讨论】:

            • 几个问题:(1) Peek 将被安排,未来接收的消息不会,这将导致您的应用程序挂起等待安排的消息。 (2) 无论状态如何,每个 Peek 都会移动到下一条消息,因此如果您放弃第一条消息而不是 Complete,则第二个 Peek 将获得第二条消息,但第二个接收将重新处理第一条消息。这会导致 Peek 从 Receive 偏移一条消息,并将在队列末尾为每条放弃的消息留下一条未处理的消息。
            • +1 @JustinJStark 在针对未接收新消息的队列测试此代码(原样,不处理,仅接收并完成)时,它总是无法完全清除队列。它只清除了 50-75% 的消息。我不会使用这种方法来清除队列。
            • 这不是Microsoft.Azure.ServiceBus中找到的QueueClient吧?我很难找到具有使用 CreateReceiver 方法的 QueueClient 的 Microsoft.ServiceBus.Messaging...
            猜你喜欢
            • 2020-12-17
            • 2017-02-25
            • 2020-09-07
            • 2015-07-18
            • 1970-01-01
            • 1970-01-01
            • 2018-04-01
            • 2013-08-19
            • 2015-06-08
            相关资源
            最近更新 更多