【问题标题】:How to achieve equivalent using TPL/TAP?如何使用 TPL/TAP 实现等效?
【发布时间】:2013-02-07 10:51:40
【问题描述】:

下面我有一个使用 Azure 服务总线队列的相当简单的 .NET 控制台应用程序。

如您所见,我正在使用 Task.Factory 启动 25 个接收器任务,然后调用我的 APM 样式的 BeginMessageReceive 方法。然后,在 EndMessageReceive 结束时,我再次调用 BeginMessageReceive 以保持循环继续进行。

我的问题是,除了使用递归任务并可能使用 C# 5.0 async/await 从 APM 样式的 BeginMessageReceive/EndMessageReceive 切换到 TPL/TAP 方法之外,我如何才能实现相同的目标?

using System;
using System.Configuration;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;


namespace ServiceBusConsumer
{
    class Program
    {
        private static QueueClient _queueClient;

        private static void Main(string[] args)
        {    
            var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
            _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

            for (var i = 0; i < 25; i++ )
            {
                Task.Factory.StartNew(BeginMessageReceive);
            }

            Console.WriteLine("Waiting for messages...");
            Console.ReadKey();

            _queueClient.Close();

        } //end private static void Main(string[] args)

        private static void BeginMessageReceive()
        {
            _queueClient.BeginReceive(TimeSpan.FromMinutes(5), EndMessageReceive, null);
        }

        private static void EndMessageReceive(IAsyncResult iar)
        {
            var message = _queueClient.EndReceive(iar);
            try
            {
                if (message != null)
                {
                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                    {
                        // Mark brokered message as completed at which point it's removed from the queue.
                        message.Complete();
                    }
                }
            }
            catch (Exception ex)
            {
                if (_queueClient.Mode == ReceiveMode.PeekLock)
                {
                    // unlock the message and make it available 
                    message.Abandon();
                }

                Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
            BeginMessageReceive();
        }

    }
}

如果 MessageReceive 超时过期,用于递归再次调用自身的新修改代码:

private static async Task MessageReceiveAsync()
{
    while (true)
    {
        using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))try
        {
            if (message != null)
            {               
                try
                {

                    var msg = message.GetBody<string>();
                    Console.WriteLine("Message: " + msg);

                    if (_queueClient.Mode == ReceiveMode.PeekLock)
                    {
                        // Mark brokered message as completed at which point it's removed from the queue.
                        await message.CompleteAsync();
                    }
                }
                catch (Exception ex)
                {
                    message.AbandonAsync();
                    Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
                }
            }
        }
    }
}

【问题讨论】:

  • 上面修改后的代码似乎可以解决问题。如果 MessageReceiveAsync() 将执行 I/O (WebRequest) 操作,仍然想知道 PLINQ 是否会更好/更容易。本质上,我正在尝试并行化来自队列的消息接收操作,然后我需要并行化每条消息的工作(这实际上是一个 HTTP Post);
  • 仅供参考,我必须控制要触发多少 MessageReceiveAsync() 操作的原因之一是 Azure 服务总线队列对并发连接数有限制。

标签: c# task-parallel-library parallel-extensions


【解决方案1】:

Azure 客户端库似乎仍未使用 TAP API 进行更新。不知道有什么问题...

无论如何,您可以使用TaskFactory.FromAsync 创建自己的APM->TAP wrappers,如下所示:

public static class MyAzureExtensions
{
  public static Task<BrokeredMessage> ReceiveAsync(this QueueClient @this,
      TimeSpan serverWaitTime)
  {
    return Task<BrokeredMessage>.Factory.FromAsync(
        @this.BeginReceive, @this.EndReceive, serverWaitTime, null);
  }

  public static Task CompleteAsync(this BrokeredMessage @this)
  {
    return Task.Factory.FromAsync(@this.BeginComplete, @this.EndComplete, null);
  }
}

将 Azure 调用封装到 TAP-ready API 中后,您可以这样使用它们:

private static void Main(string[] args)
{    
  var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
  _queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");

  for (var i = 0; i < 25; i++ )
    MyMessageReceiveAsync();

  Console.WriteLine("Waiting for messages...");
  Console.ReadKey();

  _queueClient.Close();
}

private static async Task MyMessageReceiveAsync()
{
  while (true)
  {
    using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))
    {
      try
      {
        var msg = message.GetBody<string>();
        Console.WriteLine("Message: " + msg);

        if (_queueClient.Mode == ReceiveMode.PeekLock)
        {
          // Mark brokered message as completed at which point it's removed from the queue.
          await message.CompleteAsync();
        }
      }
      catch (Exception ex)
      {
        if (_queueClient.Mode == ReceiveMode.PeekLock)
        {
          // unlock the message and make it available 
          message.Abandon();
        }

        Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
      }
    }
  }
}

像这样使用async 的一个好处是您不会不必要地占用线程池线程。原来用了25个线程来监听;我的示例不会使用任何线程来收听。在我的示例中,线程池线程唯一被绑定的时间是消息被放弃时(在错误处理分支中)。

与原始代码有一个主要的语义差异:如果QueueClient 的“接收”引发异常,则在原始代码中它会导致进程崩溃;在我的示例中,异常将被忽略。

【讨论】:

  • 太棒了!这正是我想要实现的。非常感谢斯蒂芬!
  • 现在,我只遇到了一个问题,不知道如何解决。正如您在上面看到的,在对 ReceiveAsync 的调用中,我指定了 5 分钟的服务器等待超时。因此,在 5 分钟未收到消息后,我的控制台应用程序将不再收到任何消息。我知道您可以将其设置为最多 24 天,但是关于如何修改它以解决这个问题并始终保持 25 个“听众”的任何想法?最终,这将被放入服务/Azure Worker 角色中,因此我需要它在连续循环的基础上工作。
  • 超时会发生什么?它会引发异常吗?
  • 不,ServiceBusConsumer 控制台应用程序只是在那里运行。然后,如果我将新消息排入队列(在我的另一个将测试消息排入队列的控制台应用程序中),除非我重新启动 ServiceBusConsumer 控制台应用程序,否则它们不会被接收。也许它在 ReceiveAsync 中引发了异常,但正如您所指出的,它可能被吞没了,所以我看不到它。
  • 如果它抛出异常,那么你可以捕捉它。只需将using 移动到try 内即可。
猜你喜欢
  • 2011-03-25
  • 1970-01-01
  • 2020-09-20
  • 2015-03-23
  • 2016-08-19
  • 1970-01-01
  • 2011-09-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多