【问题标题】:How to execute a number of tasks in parallel, and return ASAP when a task completes with a specific result?如何并行执行多个任务,并在任务完成并返回特定结果时尽快返回?
【发布时间】:2021-02-12 05:25:24
【问题描述】:

我要执行一定数量的方法,第一个以一定值结束的方法必须退出循环。

代码如下所示:

var tasks = new List<Task<bool>>();
tasks.Add(VerificheHandler.RandomString(14));
tasks.Add(VerificheHandler.RandomString(9000)); 

public bool Test()
{
    Parallel.ForEach(tasks, task =>
    {
        task.Start();
        if(task.Result)
        {
            //exit parallel and kill all
            return true;
        }
    }); 
}   
                    
public async static Task<bool> RandomString(int delay)
{
    await Task.Delay(delay);
    return true;
}

错误是:

task.Result
Start may not be called on a promise-style task

【问题讨论】:

  • Parallel.Foreach 不适合并行运行Tasks。它旨在执行 CPU 绑定操作。
  • and the first method that ends with a certain value 什么确定值?在这种情况下是true
  • 是的,async 方法总是返回“热”Tasks,你不要对它们调用 Start,它们已经开始了。将任务创建与任务执行分开的整个想法通常不是一个好主意。您可能在一组Tasks 上寻找(When/Wait)Any 的组合并使用CancellationToken,但您目前的整个问题是涉及错误机制的代码.
  • 正如 Damien 所说,Task.WhenAny 可能是正确的前进方向。使用列表调用它,如果结果不是您所期望的,则从列表中删除该任务并再次调用它。如果这是您所期望的,请返回。
  • Test 方法的预期结果是什么,以防 tasks 没有一个以特定的 true 结果完成?

标签: c# async-await parallel-processing task-parallel-library


【解决方案1】:

您可以使用ParallelLoopState.Break 提前执行并行循环。还有一个.Stop() 方法。 Break 将继续执行所有索引,直到调用 break 的索引,而 stop 不会启动较低索引的迭代。在这两种情况下,循环可能已经开始或完成了更高索引的迭代。

此外,parallel.for/foreach 通常不与任务结合使用。使方法同步会使它看起来像这样:

    public static void Test()
    {
        Parallel.For(0, 10, (i, state) =>
        {
            var r = RandomString(100);
            if (r)
            {
                state.Break();
            }
        });
    }

    public static bool RandomString(int delay)
    {
        Thread.Sleep(delay);
        return true;
    }

【讨论】:

    【解决方案2】:

    您提供的示例无法编译,因为您无法将Task&lt;bool&gt; 存储在Task&lt;string&gt; 的集合中。

    如果我正确理解您的问题,那么您正在寻找解决方案:

    • 同时启动多个方法
    • 只要其中一种方法返回给定值,就停止执行。

    为了实现这样的解决方案,我们必须使用CancellationToken 才能停止任何正在进行的操作。所以修改后的RandomString 可能是这样的:

    public static async Task<string> RandomString(int delay, CancellationToken token)
    {
        try
        {
            await Task.Delay(delay, token);
        }
        catch (OperationCanceledException) 
        {
            Console.WriteLine("Task has been cancelled " + delay);
            return null;
        }
        Console.WriteLine("Finished " + delay);
        return "Random " + delay;
    }
    
    • 我在方法中添加了一个新的CancellationToken 参数,然后我将它传递给了Task.Delay
      • 如果没有取消,那么它将打印出一些调试信息并返回一些随机字符串。
      • 如果在 Task.Delay 期间取消,那么我们浅化 OperationCanceledException 并打印一些调试信息。

    为了能够冷启动这个方法,我们可以在它周围引入一个简单的包装器:

    public static Func<CancellationToken, Task<string>> RandomStringWrapper(int delay)
        => (ct) => RandomString(delay, ct);
    
    • 它不会立即调用RandomString,而是返回一个需要CancellationToken 的函数。
    • 只要传递了CancellationToken,任务就会运行。

    所以,现在我们可以引入一个足够通用的扩展方法来支持多种不同的异步方法:

    public static class TaskEx
    {
        public static async Task RunUntil<T>(this IEnumerable<Func<CancellationToken, Task<T>>> asyncFunctions, T exitCondition, CancellationTokenSource cts)
        {
            var jobs = asyncFunctions.Select(fn => fn(cts.Token)).ToList();
            while (true)
            {
                var fastest = await Task.WhenAny(jobs);
                if (fastest.Result.Equals(exitCondition))
                {
                    cts.Cancel(true);
                    return;
                }
    
                jobs.Remove(fastest);
    
                if (jobs.Count == 0)
                    return;
            }
        }
    }
    
    • asyncFunctions:是函数的集合。每个人都希望 CancellationToken 返回 Task&lt;T&gt;
    • exitCondition:顾名思义……
    • ctsCancellationToken 提供者。
    • 首先,我们通过将 CancellationToken 传递给异步方法来启动它们。
    • 在循环中,我们等待 fastest 完成。
    • 如果fastest 的结果等于exitCondition,那么我们取消剩余的任务。
    • 如果它们不相等,则我们从 jobs 中删除已完成的作业并重新运行相同的循环,直到有作业要执行。

    现在,让我们把所有这些放在一起:

    static async Task Main(string[] args)
    {
        var cts = new CancellationTokenSource();
        var tasks = new[] { RandomStringWrapper(14), RandomStringWrapper(600), RandomStringWrapper(500) };
        await tasks.RunUntil("Random 500", cts);
    }
    

    输出将是:

    Finished 14
    Finished 500
    Task has been cancelled 600
    

    【讨论】:

    • Peter while (true) ... Task.WhenAny 循环是一种反模式。当任务数量变大时,无论是性能还是分配,它的扩展性都很差。对于少量任务(少于 1000 个)来说这是可以的,但无论如何我都会在生产环境中避免它。另外我会避免过早吞下OperationCanceledExceptions,并使用魔法值(如null)来传达取消。
    • @TheodorZoulias Yepp,您的观察是正确的。显然,对于过于简单的问题,这是一个幼稚的解决方案。
    • Peter btw 请注意,Microsoft 对引入 TaskCanceledException 类型并不满意。您可以听到 Stephen Toub 谈论 here。出于这个原因,我建议改为使用基本类型 OperationCanceledException
    • @TheodorZoulias 谢谢,我已经修好了。
    猜你喜欢
    • 2020-08-10
    • 1970-01-01
    • 1970-01-01
    • 2015-01-18
    • 2014-02-10
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多