【问题标题】:Read all active messages in azure service bus queue读取 Azure 服务总线队列中的所有活动消息
【发布时间】:2021-08-16 02:33:06
【问题描述】:

我是 azure 服务总线的新手,我应该将消息推送到队列,然后有一个单独的计划任务将读取此队列中的所有活动消息并将它们批量导入 sql 我之前尝试过这段代码,当我在发送消息后立即调用它时它正在工作,但现在它在单独的计划任务中不起作用。 任何帮助为什么或我可以使用什么来批量阅读消息或这是不可能的

queueClient = new QueueClient(conn, queuename, ReceiveMode.ReceiveAndDelete);

                var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                {
                    MaxConcurrentCalls = 1,
                    AutoComplete = false
                };
                queueClient.RegisterMessageHandler(ReceiveMessagesAsync, messageHandlerOptions);


public async Task ReceiveMessagesAsync(Message message, CancellationToken token)
        {
            messages.Add(message.Body.ToString());
            Console.WriteLine($"Received message: {Encoding.UTF8.GetString(message.Body)}");
            await queueClient.CompleteAsync(message.SystemProperties.LockToken);
        }

        public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            Console.WriteLine(exceptionReceivedEventArgs.Exception);
            return Task.CompletedTask;
        }

【问题讨论】:

    标签: c# azureservicebus azure-servicebus-queues


    【解决方案1】:

    最好知道您使用什么库来执行此操作,但我建议使用https://www.nuget.org/packages/Azure.Messaging.ServiceBus/ nuget 包,您可以:

    class Program
    {
        static async Task Main(string[] args)
        {
            string queueName = "myqueue";
            var client = new ServiceBusClient("myconn");
            // create a processor that we can use to process the messages
            var processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
            // add handler to process messages
            processor.ProcessMessageAsync += MessageHandler;
            // add handler to process any errors
            processor.ProcessErrorAsync += ErrorHandler;
            await processor.StartProcessingAsync();
            // Process messages for 5 minutes 
            await Task.Delay(TimeSpan.FromMinutes(5));
            // stop processing 
            Console.WriteLine("Stopping the receiver...");
            await processor.StopProcessingAsync();
            Console.WriteLine("Stopped receiving messages");
        }
        private static Task ErrorHandler(ProcessErrorEventArgs arg)
        {
            // Here you can catch errors;
            return Task.CompletedTask;
        }
        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            // Do something with the message .e.g deserialize it and insert to SQL
            try
            {
                BinaryData content = args.Message.Body;
                // Here you can use :
                string contentStr = content.ToString(); // This would be your data
            }
            catch (Exception e)
            {
                // If something goes wrong you should abandon the message
                await args.AbandonMessageAsync(args.Message);
            }
            await args.CompleteMessageAsync(args.Message);
        }
    }
    

    这将处理所有消息,直到时间结束。这对于计划任务很有用。如果您想在处理完特定数量的消息后停止,可以执行异步循环并检查是否已达到该数量。

    【讨论】:

    • 但是如果我需要原始函数中的contentStr 怎么办?在这种情况下,它是 Main 但如果这是一个 HTTP 端点并且我需要将消息正文返回给调用者怎么办?
    • 通常不建议将服务总线与 HTTP 端点混合使用,它们的工作方式非常不同。服务总线是异步的,你不应该像在 HTTP 上那样等待响应。如果您需要在 HTTP 客户端(例如浏览器)上接收服务总线消息,我建议您查看 SignalR 之类的东西,在这种情况下,您将收到消息并通过集线器通知客户端。看看docs.particular.net/samples/near-realtime-clients
    猜你喜欢
    • 1970-01-01
    • 2017-04-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-06-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多