鉴于使用async / await,您当前的代码不一定是synchronous(在线程方面 - 可以在不同线程上调用延续),尽管获取消息和处理它之间的依赖关系显然必须坚持。
Re:线程可能处于等待状态,可能允许它在其他地方使用
等待编码良好的 I/O 绑定工作根本不需要消耗线程 - 请参阅 Stephen Cleary's There is no thread。假设这两个等待的任务是 IO 绑定的,您的代码在等待 IO 绑定的工作时可能根本不消耗任何线程,即应用程序的其余部分将使用线程池。因此,如果您唯一关心的是浪费线程,那么就不需要更多了。
但是,如果您关心的是性能和额外的吞吐量,如果有下游容量可以对 ProcessMessage 进行并发调用(例如,多个下游 Web 服务器或额外的数据库容量),那么您可以考虑并行化 IO 绑定工作(再次,不需要更多的线程池线程)
例如,如果您能够重写 GetMessages 调用来一次检索一个批次,您可以试试这个:
var messages = await GetMessages(10);
var processTasks = messages
.Select(message => ProcessMessage(message));
await Task.WhenAll(processTasks);
(如果你不能触摸代码,你可以循环 GetMessages 以检索 Task.WhenAll 之前的 10 条单独的消息)
但是,如果您没有更多能力进行并发 ProcessMessage 调用,那么您应该考虑解决瓶颈问题 - 例如添加更多服务器、优化代码或并行化在ProcessMessage work 中完成的工作等。
原理是,正如您所说,GetMessages 从队列中检索数据。如果您没有能力处理您检索到的消息,您所能做的就是在其他地方排队消息,这似乎毫无意义 - 而是将消息留在队列中,直到您准备好处理它们。队列深度还将创建积压工作的可见性,您可以对其进行监控。
编辑,回复:有时一个ProcessMessage() 调用比其他调用花费更长的时间
根据 cmets,OP 有额外的信息表明,偶尔的 ProcessMessage 调用比其他调用花费的时间要长得多,并且希望在此期间继续处理其他消息。
一种方法是使用此clever pattern here 对并行任务应用超时,如果达到该超时,将使任何长时间运行的 ProcessTasks 继续运行,并将继续处理下一批消息。
以下内容具有潜在危险,因为它需要仔细平衡超时(以下 1000 毫秒)与观察到的不当行为 ProcessMessage 调用的频率 - 如果超时与“慢”ProcessMessages 的频率相比太低,下游资源可能会不堪重负。
一个更安全(但更复杂)的补充是通过Task.IsCompleted 跟踪未完成的ProcessMessage 任务的并发数量,如果这达到阈值,则等待这些任务的完成以将积压带到安全水平。
while(!cancellationToken.IsCancelled)
{
// Ideally, the async operations should all accept cancellationTokens too
var message = await GetMessages(10, cancellationToken);
var processTasks = messages
.Select(message => ProcessMessage(message, cancellationToken));
await Task.WhenAny(Task.WhenAll(processTasks),
Task.Delay(1000, cancellationToken));
}
Re:为下游负载的安全水平进行节流 - TPL DataFlow 很可能在这里使用。