【问题标题】:How can I await an enumerable of tasks and stop when a number of tasks have completed?如何等待可枚举的任务并在完成多个任务时停止?
【发布时间】:2022-01-04 15:33:23
【问题描述】:

我有一组任务运行相同的作业,但在不同的服务器上使用不同的参数。可能会发生其中一台服务器无响应/缓慢的情况,从而导致所有任务都已完成但只有一个任务完成的情况。目前我正在使用Task.WhenAll() 等待他们,所以别无选择,只能等到我的超时到期。

在理想情况下,所有任务都在超时内完成,我可以收集所有结果,但在另一种情况下,基本上我想等待:

  • 直到 n 个任务完成
  • 如果 n 个任务已完成,再等待 x 分钟

如果在 n 个任务已经完成并且我们又等待了 x 分钟时,并非所有任务都已完成,我想检索完成的任务。

有什么方法可以实现上述目标吗?

【问题讨论】:

标签: c# .net task-parallel-library


【解决方案1】:

即使您有复杂的取消逻辑,您也希望取消底层任务。如果在合适的时间取消了底层任务,无论如何你都可以使用Task.WhenAll

因此,分解您的问题,您要问的是,“如何根据其他任务的状态取消任务?”。您需要保持已完成任务数量的状态,并根据该状态取消您的任务。

如果您需要在任务完成时做“事情”(例如更新已完成任务的状态),我发现延续很有帮助,而且是一个非常干净的解决方案。您的用例示例:

// n from your question
var n = 4; 

// number of tasks currently completed
var tasksCompleted = 0; 

// The list of tasks (note it's the continuations in this case)
// You can also keep the continuations and actual tasks in separate lists.
var tasks = new List<Task>();

// delay before cancellation after n tasks completed
var timeAfterNCompleted = TimeSpan.FromMinutes(x); 
using var cts = new CancellationTokenSource();

for (int i = 0; i < 10; i++)
{
    // Do your work with a passed cancellationtoken you control
    var currentTask = DoWorkAsync(i, cts.Token);

    // Continuation will update the state of completed tasks
    currentTask = currentTask.ContinueWith((t) => 
    {
        if (t.IsCompletedSuccessfully)
        {
            var number = Interlocked.Increment(ref tasksCompleted);
            if (number == n)
            {
                // If we passed n tasks completed successfully,
                // We'll cancel after the grace period
                // Note that this will actually cancel the underlying tasks
                // Because we passed the token to the DoWorkAsync method
                cts.CancelAfter(timeAfterNCompleted);
            }
        }
    });
    tasks.Add(currentTask);
}

await Task.WhenAll(tasks);

// All your tasks have either completed or cancelled here
// Note that in this specific example all tasks will appear
// to have run to completion. That's because we're looking at
// the continuations here. Store continuation and actual task
// in separate lists and you can retrieve the results.
// (Make sure you await the continuations though)

【讨论】:

  • 作为旁注,it's recommendedTaskScheduler.Default 作为参数传递给ContinueWith,以确保延续将在众所周知的调度程序上运行。
【解决方案2】:

使用Task.WhenAny 了解是否有任何任务完成,然后从您的阵列中删除已完成的任务。

stopWatch.Start();
while (arrayoftasks.Any())
{
    Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
    arrayOfTasks.Remove(finishedTask);
    await finishedTask;
    finishedCount++;
    if (finishedCount == 4) //check you stopwatch elapsed here.
    {
        Console.WriteLine("4 tasks have finished");
    }
}

工作示例:

using System.Diagnostics;
using System.Security.Cryptography;

await Test.Go();
Console.ReadLine();
public static class Test
{
    public static async Task Go()
    {
        List<Task<string>> arrayOfTasks = GetArrayOfTasks();
        int finishedCount = 0;
        Stopwatch stopWatch = new Stopwatch();
        stopWatch.Start();
        while (arrayOfTasks.Any())
        {
            Task<string> finishedTask = await Task.WhenAny(arrayOfTasks);
            arrayOfTasks.Remove(finishedTask);
            Console.WriteLine(await finishedTask);
            finishedCount++;
            if (finishedCount == 4) //check you stopwatch elapsed here too
            {
                Console.WriteLine($":::{finishedCount} tasks have finished, {arrayOfTasks.Count} to go");
            }
        }
    }

    private static List<Task<string>> GetArrayOfTasks()
    {
        List<Task<string>> taskList = new();
        for (int i = 0; i < 10; i++)
        {
            var t = GetString(i);
            taskList.Add(t);
        }
        return taskList;
    }

    private static async Task<string> GetString(int i)
    {
        await Task.Delay(RandomNumberGenerator.GetInt32(1, 5000));
        return i.ToString();
    }
}   

【讨论】:

  • 这与任务数量的比例非常很差,因为您要向任务添加 n^2 延续,而不是每个任务只添加一个,而 TPL 不是t 针对如此大量的延续进行了优化。它还增加了许多在此操作的任务完成后可能运行的工作,使调试问题变得更加困难。从列表中删除任意项目也不必要地低效,但这比单位成本高得多的延续影响小。对于非平凡的尺寸,这些都是现实问题。
  • @canton7 取决于所需的错误处理语义是否可取。
  • @canton7 由于竞争条件,它不会总是成功,但即便如此,仍然非常低效。进行了大量的锁定,并且延续存储在一个列表中,当它扩展时搜索效率不高,并且当移除延续时列表的大小不会缩小。 MSDN 认为这并不意味着这是一个好主意,他们提出了很多次优的,有时甚至是非常有问题的事情。如果 OP 知道他们总是有非常少量的任务,那很好。但这是一个很大的“如果”。
  • @Servy 好的,如果没有 SC 你可以在那里比赛,但我仍然不同意你关于搜索列表在扩展时效率低下的断言(因为它已被清除)。 Monitor.Enter 在调用内核之前会旋转一段不小的时间,Task 不会长时间锁定延续,所以如果你真的让线程在这里暂停,我会感到惊讶,但我又一次' 需要配置文件才能在这里更加自信。我们同意有更好的方法来实现这一点,我的反对意见是“exceedingly
  • @canton7 我希望我有 ;) 我有大约 30 台服务器在运行,每台服务器只处理一项工作。所以在我的情况下,开销是可以接受的。
【解决方案3】:

Rx.Net 是实现这一目标的最优雅方式。

public IAsyncEnumerable<TResult> DoStuff<TResult>(IEnumerable<Func<CancellationToken, Task<TResult>>> tasks)
{
    var inputs = tasks
            // convert this into IObservable<TResult>
            // this type, like IAsyncEnumerable, contains
            // async logic, and cancellation...
            .ToObservable()
            .Select(task => Observable.FromAsync(task))
            .Merge()
            // publish/refcount is needed to ensure
            // we only run the tasks once, and share
            // the "result/event".
            .Publish()
            .RefCount();
                         // On the 100th Item
    var timeoutSignal = inputs.Skip(100 - 1)
                          .Take(1)
                          // Generate a signal 10 minutes after the 100th 
                          // item arrives
                          .Delay(TimeSpan.FromMinutes(10));
    return inputs
            // Take items until the timeout signal
            .TakeUntil(timeoutSignal)
            .ToAsyncEnumerable();
    
}

var items = await DoStuff(tasks).ToListAsync()

【讨论】:

  • OP 说他们有一个可枚举的任务,而不是一个可枚举的 Funcs。
  • @TheodorZoulias 这不可能解决 OP 的问题。因为它会取消飞行中的任务......
猜你喜欢
  • 1970-01-01
  • 2014-08-20
  • 1970-01-01
  • 2017-01-22
  • 2021-10-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多