【问题标题】:Azure ServiceBus & async - To be, or not to be?Azure ServiceBus & async - 是,还是不是?
【发布时间】:2013-03-14 19:01:24
【问题描述】:

我在 Azure 上运行服务总线,每秒发送大约 10-100 条消息

最近我切换到 .net 4.5 并且所有人都兴奋地重构了所有代码以在每行中至少有两次 'async' 和 'await'确保它“正确”完成:)

现在我想知道它实际上是好还是坏。如果您可以查看代码 sn-ps 并告诉我您的想法。我特别担心如果 线程上下文切换 没有给我带来更多的痛苦而不是好处,来自所有的异步......(看看!dumpheap 这绝对是一个因素)

只是一点描述 - 我将发布 2 种方法 - 一种在 ConcurrentQueue 上执行 while 循环,等待新消息,另一种方法一次发送一条消息。我也完全按照 Azure 博士的规定使用瞬态故障处理模块。

发送循环(从头开始,等待新消息):

private async void SendingLoop()
    {
        try
        {
            await this.RecreateMessageFactory();

            this.loopSemaphore.Reset();
            Buffer<SendMessage> message = null;

            while (true)
            {
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }
                this.semaphore.WaitOne();
                if (this.cancel.Token.IsCancellationRequested)
                {
                    break;
                }

                while (this.queue.TryDequeue(out message))
                {                       
                    try
                    {
                        using (message)
                        {
                            //only take send the latest message
                            if (!this.queue.IsEmpty)
                            {
                                this.Log.Debug("Skipping qeued message, Topic: " + message.Value.Topic);
                                continue;
                            }
                            else
                            {
                                if (this.Topic == null || this.Topic.Path != message.Value.Topic)
                                    await this.EnsureTopicExists(message.Value.Topic, this.cancel.Token);

                                if (this.cancel.Token.IsCancellationRequested)
                                    break;
                                await this.SendMessage(message, this.cancel.Token);
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        break;
                    }
                    catch (Exception ex)
                    {
                        ex.LogError();
                    }
                }
            }
        }
        catch (OperationCanceledException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }
        finally
        {
            if (this.loopSemaphore != null)
                this.loopSemaphore.Set();
        }
    }

发送消息:

private async Task SendMessage(Buffer<SendMessage> message, CancellationToken cancellationToken)
    {
        //this.Log.Debug("MessageBroadcaster.SendMessage to " + this.GetTopic());
        bool entityNotFound = false;

        if (this.MessageSender.IsClosed)
        {
            //this.Log.Debug("MessageBroadcaster.SendMessage MessageSender closed, recreating " + this.GetTopic());
            await this.EnsureMessageSender(cancellationToken);
        }

        try
        {
            await this.sendMessageRetryPolicy.ExecuteAsync(async () =>
            {
                message.Value.Body.Seek(0, SeekOrigin.Begin);
                using (var msg = new BrokeredMessage(message.Value.Body, false))
                {
                    await Task.Factory.FromAsync(this.MessageSender.BeginSend, this.MessageSender.EndSend, msg, null);
                }
            }, cancellationToken);
        }
        catch (MessagingEntityNotFoundException)
        {
            entityNotFound = true;                
        }
        catch (OperationCanceledException)
        { }
        catch (ObjectDisposedException)
        { }
        catch (Exception ex)
        {
            ex.LogError();
        }

        if (entityNotFound)
        {
            if (!cancellationToken.IsCancellationRequested)
            {
                await this.EnsureTopicExists(message.Value.Topic, cancellationToken);
            }
        }
    }

上面的代码来自一个每秒发送 1 条消息的“Sender”类。在任何给定时间,我都有大约 50-100 个实例在运行,所以它可能是相当多的线程。

顺便说一句,不要担心 EnsureMessageSender、RecreateMessageFactory、EnsureTopicExists 太多,它们不会经常被调用。

如果我只需要一次发送一条消息,不用担心异步的东西并避免随之而来的开销,我最好让一个后台线程处理消息队列并同步发送消息。

请注意,将一条消息发送到 Azure 服务总线通常只需几毫秒,这并不昂贵。 (除了有时速度很慢、超时或服务总线后端出现问题时,它可能会在尝试发送内容时挂起一段时间)。

感谢和抱歉这么长的帖子,

史蒂夫

建议的解决方案

这个例子能解决我的情况吗?

static void Main(string[] args)
    {
        var broadcaster = new BufferBlock<int>(); //queue
        var cancel = new CancellationTokenSource();

        var run = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;                       

                    //async wait until a value is available
                    var val = await broadcaster.ReceiveAsync(cancel.Token).ConfigureAwait(false);
                    int next = 0;

                    //greedy - eat up and ignore all the values but last
                    while (broadcaster.TryReceive(out next))
                    {
                        Console.WriteLine("Skipping " + val);
                        val = next;
                    }

                    //check if we are not finished
                    if (cancel.IsCancellationRequested)
                        break;

                    Console.WriteLine("Sending " + val);

                    //simulate sending delay
                    await Task.Delay(1000).ConfigureAwait(false); 

                    Console.WriteLine("Value sent " + val);                        
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }

        }, cancel.Token);

        //simulate sending messages. One every 200mls 
        for (int i = 0; i < 20; i++)
        {
            Console.WriteLine("Broadcasting " + i);
            broadcaster.Post(i);
            Thread.Sleep(200);
        }

        cancel.Cancel();
        run.Wait();
    }

【问题讨论】:

  • 如果你只有几个线程运行同步通常会给你比异步更多的吞吐量,因为异步有额外的开销。它的威力只有在高并发时才能显现。
  • 上面的代码来自一个每秒发送 1 条消息的“Sender”类。我有大约 50-100 个实例在任何给定时间运行,所以它可能是相当多的线程。

标签: c# multithreading asynchronous servicebus context-switch


【解决方案1】:

首先,请记住 Task != Thread。任务(和async 方法延续)被安排到线程池中,只要您的任务相当短,Microsoft 就会在线程池中进行大量优化。

查看您的代码,一行会引发一个标志:semaphore.WaitOne。我假设您将其用作一种信号,表明队列中有可用数据。这很糟糕,因为它是在 async 方法中的 阻塞 等待。通过使用阻塞等待,代码从轻量级延续变为更重的线程池线程。

所以,我会按照@usr 的建议,将队列(和信号量)替换为async-ready 队列。 TPL Dataflow's BufferBlock&lt;T&gt; 是一个 async-ready 生产者/消费者队列,可用 via NuGet。我首先推荐这个,因为听起来您的项目可以从更广泛地使用数据流中受益,而不仅仅是作为队列(但队列是一个很好的起点)。

存在其他async-ready 数据结构;我的AsyncEx library 有几个。自己构建一个简单的也不难;我有一个blog post on the subject。但我建议在您的情况下使用 TPL Dataflow。

【讨论】:

  • 谢谢,我会试一试。在我实现这个之前,我做了一个简单的 POC 纯异步 vs TPL DataFlow,当涉及到很多小任务时,DataFlow 慢了很多(慢了很多倍)。因此,我自己选择了“纯低级”异步解决方案。
  • 数据流确实有更多的开销。但如果你打算自己做async 解决方案,至少要去掉(同步)阻塞。
  • 谢谢。请查看使用 BufferBlock 的建议解决方案
  • 该解决方案看起来不错。我还鼓励您查看其他块,看看您是否可以让更多的解决方案使用数据流。
  • 问题 - await BufferBlock.ReceiveAsync(this.cancel.Token);卡住了(死锁?)!即使向块发布更多消息(来自不同的线程),来自 ReceiveAsync 的等待也永远不会回来。 BufferBlock.Count 只是增长,但没有得到处理。有什么想法会发生这种情况吗?到目前为止,我还无法在一个小示例中重现它,但它在生产代码中发生了很多。
【解决方案2】:

你说:

上面的代码来自一个每秒发送 1 条消息的“Sender”类。一世 在任何给定时间运行大约 50-100 个实例,所以它可能是 相当多的线程。

这是异步的一个很好的例子。你在这里保存了很多线程。异步减少上下文切换,因为它不是基于线程的。在需要等待的情况下,它不会进行上下文切换。相反,下一个工作项正在同一个线程上处理(如果有的话)。

因此,您的异步解决方案肯定会比同步解决方案更好地扩展。需要衡量它是否在 50-100 个工作流实例中实际使用更少的 CPU。实例越多,异步更快的可能性就越高。

现在,实现存在一个问题:您使用的是ConcurrentQueue,它不是异步就绪的。因此,即使在异步版本中,您实际上也确实使用了 50-100 个线程。他们要么阻塞(你想避免)要么忙等待燃烧 100% CPU(这似乎是你的实现的情况!)。您需要摆脱这个问题并使排队也异步。也许SemaphoreSlim 在这里会有所帮助,因为它可以异步等待。

【讨论】:

  • 感谢您的提示!我看过 SemaphoreSlim,我想我可以使用 AsyncWait,但我对一些事实感到紧张 - 1)它是基于计数的,我发现一些关于人们在释放信号量的正确方法方面遇到问题的帖子 2)它是主要目的是如果您等待的时间很短,尽管我在任何地方都没有发现什么是很短的时间。但我认为它不到 1 秒,这是我的典型用例。如果 ManualResetEventSlim 有异步等待 API 会很酷...
  • 1) 是一个纪律问题,也适用于其他任何事情。 2)它在低竞争和低等待时表现最好,但即使在面对竞争时也会表现良好。您的第一要务是摆脱 50-100 个线程的阻塞。他们浪费在等待锁上。
  • 1) 同意。 DataFlow BufferBlock 似乎与 Queue 和 Semaphore 一样好,所以我现在选择“更简单”。但是,我不完全理解“它们将阻塞(您想避免)或忙于等待燃烧 100% CPU(您的实现中似乎就是这种情况!)”是什么意思,我想这是关于我在等待信号量时的线路。我做了一些测试,等待 ManualResetEvent 似乎根本没有占用任何处理器时间,你能详细说明一下什么可能占用 100% CPU。谢谢
  • 我可能只是误读了代码。它看起来像一个快速运行的while(true),没有任何等待。但是有信号量所以那里没有问题(除了阻塞阻塞线程,因为它无法到达异步点)。
  • 在 BufferBlock 出现问题后,我决定尝试 SemaphoreSlim,看起来效果很好,感谢您的提示。希望将来 ManualResetEventSlim 也能获得 Async api,因为这更适合我的情况。
猜你喜欢
  • 1970-01-01
  • 2019-05-24
  • 1970-01-01
  • 2013-10-11
  • 2020-08-19
  • 1970-01-01
  • 1970-01-01
  • 2018-06-25
  • 2020-11-18
相关资源
最近更新 更多