【发布时间】:2013-02-07 10:51:40
【问题描述】:
下面我有一个使用 Azure 服务总线队列的相当简单的 .NET 控制台应用程序。
如您所见,我正在使用 Task.Factory 启动 25 个接收器任务,然后调用我的 APM 样式的 BeginMessageReceive 方法。然后,在 EndMessageReceive 结束时,我再次调用 BeginMessageReceive 以保持循环继续进行。
我的问题是,除了使用递归任务并可能使用 C# 5.0 async/await 从 APM 样式的 BeginMessageReceive/EndMessageReceive 切换到 TPL/TAP 方法之外,我如何才能实现相同的目标?
using System;
using System.Configuration;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
namespace ServiceBusConsumer
{
class Program
{
private static QueueClient _queueClient;
private static void Main(string[] args)
{
var connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
_queueClient = QueueClient.CreateFromConnectionString(connectionString, "MyQueue");
for (var i = 0; i < 25; i++ )
{
Task.Factory.StartNew(BeginMessageReceive);
}
Console.WriteLine("Waiting for messages...");
Console.ReadKey();
_queueClient.Close();
} //end private static void Main(string[] args)
private static void BeginMessageReceive()
{
_queueClient.BeginReceive(TimeSpan.FromMinutes(5), EndMessageReceive, null);
}
private static void EndMessageReceive(IAsyncResult iar)
{
var message = _queueClient.EndReceive(iar);
try
{
if (message != null)
{
var msg = message.GetBody<string>();
Console.WriteLine("Message: " + msg);
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// Mark brokered message as completed at which point it's removed from the queue.
message.Complete();
}
}
}
catch (Exception ex)
{
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// unlock the message and make it available
message.Abandon();
}
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
finally
{
if (message != null)
{
message.Dispose();
}
}
BeginMessageReceive();
}
}
}
如果 MessageReceive 超时过期,用于递归再次调用自身的新修改代码:
private static async Task MessageReceiveAsync()
{
while (true)
{
using (var message = await _queueClient.ReceiveAsync(TimeSpan.FromMinutes(5)))try
{
if (message != null)
{
try
{
var msg = message.GetBody<string>();
Console.WriteLine("Message: " + msg);
if (_queueClient.Mode == ReceiveMode.PeekLock)
{
// Mark brokered message as completed at which point it's removed from the queue.
await message.CompleteAsync();
}
}
catch (Exception ex)
{
message.AbandonAsync();
Console.WriteLine("Exception was thrown: " + ex.GetBaseException().Message);
}
}
}
}
}
【问题讨论】:
-
上面修改后的代码似乎可以解决问题。如果 MessageReceiveAsync() 将执行 I/O (WebRequest) 操作,仍然想知道 PLINQ 是否会更好/更容易。本质上,我正在尝试并行化来自队列的消息接收操作,然后我需要并行化每条消息的工作(这实际上是一个 HTTP Post);
-
仅供参考,我必须控制要触发多少 MessageReceiveAsync() 操作的原因之一是 Azure 服务总线队列对并发连接数有限制。
标签: c# task-parallel-library parallel-extensions