【问题标题】:Continuous processing when thread is waiting线程等待时继续处理
【发布时间】:2017-10-28 16:15:57
【问题描述】:

我正在执行一个网络请求以获取一条消息,然后等待该消息的处理,然后再次重复整个过程。

消息的处理将长时间运行,并且线程可能处于等待状态,这可能允许它在其他地方使用。我想要的是继续 while 循环,获取更多消息并在线程空闲时处理它们。

当前同步代码:

while(!cancellationToken.IsCancelled) {
  var message = await GetMessage();

  await ProcessMessage(message); // I'll need it to continue from here if thread is released.
}

这里使用的场景是消息队列消费者服务。

【问题讨论】:

    标签: c# multithreading task


    【解决方案1】:

    鉴于使用async / await,您当前的代码不一定是synchronous(在线程方面 - 可以在不同线程上调用延续),尽管获取消息和处理它之间的依赖关系显然必须坚持。

    Re:线程可能处于等待状态,可能允许它在其他地方使用

    等待编码良好的 I/O 绑定工作根本不需要消耗线程 - 请参阅 Stephen Cleary's There is no thread。假设这两个等待的任务是 IO 绑定的,您的代码在等待 IO 绑定的工作时可能根本不消耗任何线程,即应用程序的其余部分将使用线程池。因此,如果您唯一关心的是浪费线程,那么就不需要更多了。

    但是,如果您关心的是性能和额外的吞吐量,如果有下游容量可以对 ProcessMessage 进行并发调用(例如,多个下游 Web 服务器或额外的数据库容量),那么您可以考虑并行化 IO 绑定工作(再次,不需要更多的线程池线程)

    例如,如果您能够重写 GetMessages 调用来一次检索一个批次,您可以试试这个:

    var messages = await GetMessages(10);
    var processTasks = messages
        .Select(message => ProcessMessage(message));
    await Task.WhenAll(processTasks);
    

    (如果你不能触摸代码,你可以循环 GetMessages 以检索 Task.WhenAll 之前的 10 条单独的消息)

    但是,如果您没有更多能力进行并发 ProcessMessage 调用,那么您应该考虑解决瓶颈问题 - 例如添加更多服务器、优化代码或并行化在ProcessMessage work 中完成的工作等。

    原理是,正如您所说,GetMessages 从队列中检索数据。如果您没有能力处理您检索到的消息,您所能做的就是在其他地方排队消息,这似乎毫无意义 - 而是将消息留在队列中,直到您准备好处理它们。队列深度还将创建积压工作的可见性,您可以对其进行监控。

    编辑,回复:有时一个ProcessMessage() 调用比其他调用花费更长的时间

    根据 cmets,OP 有额外的信息表明,偶尔的 ProcessMessage 调用比其他调用花费的时间要长得多,并且希望在此期间继续处理其他消息。

    一种方法是使用此clever pattern here 对并行任务应用超时,如果达到该超时,将使任何长时间运行的 ProcessTasks 继续运行,并将继续处理下一批消息。

    以下内容具有潜在危险,因为它需要仔细平衡超时(以下 1000 毫秒)与观察到的不当行为 ProcessMessage 调用的频率 - 如果超时与“慢”ProcessMessages 的频率相比太低,下游资源可能会不堪重负。

    一个更安全(但更复杂)的补充是通过Task.IsCompleted 跟踪未完成的ProcessMessage 任务的并发数量,如果这达到阈值,则等待这些任务的完成以将积压带到安全水平。

    while(!cancellationToken.IsCancelled) 
    {
       // Ideally, the async operations should all accept cancellationTokens too
       var message = await GetMessages(10, cancellationToken);
       var processTasks = messages
          .Select(message => ProcessMessage(message, cancellationToken));
       await Task.WhenAny(Task.WhenAll(processTasks), 
                          Task.Delay(1000, cancellationToken));
     }
    

    Re:为下游负载的安全水平进行节流 - TPL DataFlow 很可能在这里使用。

    【讨论】:

    • Re : "Is my code IO Bound" - 除非你等待的 ProcessMessage 实际上是在做 CPU-Bound 工作然后返回像Task.FromResult(true) 这样愚蠢的东西,并且实际上是在等待外部数据库,REST 服务, 网络资源等,那么它很可能是 IO 绑定的。 Re: 每批消息的最佳数量是多少?您需要分析 ProcessMessage 消耗的资源负载,并考虑使用相同资源的其他并发应用程序。
    • 感谢您的回复,您的示例正是我现在所拥有的!使用简化示例编写此问题时出现的问题是,这 10 条消息中的一条将运行很长时间(每分钟轮询另一个服务),最终导致循环停止以获取更多要处理的消息。我确实写了一个关于代码审查的问题,这是题外话,但可以从中获得更多见解:codereview.stackexchange.com/questions/176451/…
    • 我猜您也可以选择根本不等待ProcessMessage,但如果收到消息的速度比您处理它们的速度快,这可能会导致@987654347 使用的下游资源不堪重负@,也是not something to be doing from WebAPI / MVC type apps
    • 如果只是偶尔出现的消息是有问题的,那么也许可以考虑在Task.Delay task 中添加一个Task.WhenAny 以在Task.WhenAll 周围产生超时。这将使未完成的任务仍然“运行”,但会释放您的循环以继续处理。 (即如果超时,不要担心检查任务的“完成状态”)
    【解决方案2】:

    看看https://msdn.microsoft.com/library/hh191443(vs.110).aspx 应该可以帮助您。此外,根据 C#/.NET 样式指南,ProcessMessage 似乎应该以“异步”结尾。

    您需要设置一个Task<ReturnTypeOfProcessMessage> procMessageTask = ProcessMessageAsync(message);

    那么您就可以在它运行的同时开展您的业务, SomeBusiness(...)

    然后

    await procMessageTask;

    似乎您可能还需要某种类型的等待超时功能,以便您可以进行轮询,这里有一个与此相关的问题:

    Asynchronously wait for Task<T> to complete with timeout

    HTH

    【讨论】:

      猜你喜欢
      • 2010-12-16
      • 2013-04-25
      • 1970-01-01
      • 1970-01-01
      • 2010-09-26
      • 2014-01-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多