【问题标题】:Appropriate pattern for awaiting Task.WhenAny(List<T>) when list can have other tasks appended当列表可以附加其他任务时等待 Task.WhenAny(List<T>) 的适当模式
【发布时间】:2019-12-26 03:44:54
【问题描述】:

不可能等待正在更改的List&lt;Task&gt;,因为Task.WhenAny(List&lt;Task&gt;) 获取了List&lt;Task&gt; 的副本。

什么是合适的模式

List<Task> taskList = new List<Task>();

await Task.WhenAny(taskList);

在调用第一个 WhenAny 之后,什么时候 taskList 可以添加其他任务?

下面的完整演示代码演示了该问题。

    static readonly List<Task<int>> taskList = new List<Task<int>>();
    static readonly Random rnd = new Random(1);

    static async Task<int> RunTaskAsync(int taskID,int taskDuration)
    {
        await Task.Yield();
        Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
        await Task.Delay(taskDuration);  // mimic some work
        return taskID;
    }
    static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
    {
        // Add numTasks asyncronously to the taskList
        // First task is added Syncronously and then we yield the adds to a worker

        taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
        await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task

        // remaing task run's are Yielded to a worker thread
        for (int i = 2; i <= numTasks; i++)
        {
            await Task.Delay(rnd.Next(minDelay, maxDelay));
            taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
        }
    }
    static async Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch(); sw.Start();

        // Start a Fire and Forget Task to create some running tasks
        var _ = AddTasksAsync(10, 1, 3000);

        // while there are tasks to complete use the main thread to process them as they comeplete
        while(taskList.Count > 0)
        {
            var t = await Task.WhenAny(taskList);
            taskList.Remove(t);
            var i = await t;
            Console.WriteLine("Task {0} found to be completed at: {1}",i,sw.Elapsed);
        }

        // All tasks have completed sucessfully - exit main thread
    }

控制台输出,显示 WhenAny() 循环发现所有其他任务已完成,仅在找到并删除 60 秒任务之后。

Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Starting Task: 3 with a duration of 24 seconds
Starting Task: 4 with a duration of 15 seconds
Starting Task: 5 with a duration of 28 seconds
Starting Task: 6 with a duration of 21 seconds
Starting Task: 7 with a duration of 11 seconds
Starting Task: 8 with a duration of 29 seconds
Starting Task: 9 with a duration of 21 seconds
Starting Task: 10 with a duration of 20 seconds
Task 1 found to be completed at: 00:01:00.1305811
Task 2 found to be completed at: 00:01:00.1312951
Task 3 found to be completed at: 00:01:00.1315689
Task 4 found to be completed at: 00:01:00.1317623
Task 5 found to be completed at: 00:01:00.1319427
Task 6 found to be completed at: 00:01:00.1321225
Task 7 found to be completed at: 00:01:00.1323002
Task 8 found to be completed at: 00:01:00.1324379
Task 9 found to be completed at: 00:01:00.1325962
Task 10 found to be completed at: 00:01:00.1327377

谢谢!

【问题讨论】:

    标签: c# async-await task


    【解决方案1】:

    您显示的代码存在问题,即它在工作人员和任务创建者之间没有合理的通信管道。您需要某种消息传递机制来通知工作人员新任务(以及没有更多任务时),以便它可以对其做出反应。这是你必须为你的并发系统弄清楚的事情,确切的实现与问题无关,所以我假设我们的工作线程中有 OnTaskAdded(Task task)OnEnd() 方法。

    根据您的说法,您不想真正等到任何任务完成,而是希望每个任务在完成时执行某些操作。请参阅下面的更新答案。 这可以通过ContinueWith 实现:

    class Worker
    {
        private List<Task> _tasks = new List<Task>();
        private readonly Stopwatch _stopwatch = new Stopwatch();
    
        // Start the stopwatch in the constructor or in some kind of a StartProcessing method.
    
        void OnTaskAdded(Task<int> task)
        {
            var taskWithContinuation = task.ContinueWith(t =>
                Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
            _tasks.Add(taskWithContinuation);
        }
    
        async Task OnEndAsync()
        {
            // We're finishing work and there will be no more tasks, it's safe to await them all now.
            await Task.WhenAll(_tasks);
        }
    }
    

    编辑: 在所有关于确保合理的消息传递管道的道德讨论之后,我认为我实际上可以给你一个快速而肮脏的实现,这样你就可以看到它的工作原理:

    // DISCLAIMER: NOT PRODUCTION CODE!!!
    public static async Task Main()
    {
        Stopwatch sw = new Stopwatch(); sw.Start();
    
        // Start a Fire and Forget Task to create some running tasks
        var _ = AddTasksAsync(10, 1, 3000);
        var internalList = new List<Task>();
    
        // while there are tasks to complete use the main thread to process them as they comeplete
        var i = 0;
        while (i < 10)
        {
            while (taskList.Count <= i)
            {
                // No new tasks, check again after a delay -- THIS IS VERY BAD!
                await Task.Delay(100);
            }
            Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
            var taskWithContinuation = taskList[i].ContinueWith(t =>
                Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
            internalList.Add(taskWithContinuation);
            ++i;
        }
        await Task.WhenAll(internalList);
    }
    

    让我再次强调一下:这不是生产质量的代码!积极等待更多的任务,呃。它的输出是这样的:

    Task 1 intercepted at: 00:00:00.0495570
    Starting Task: 1 with a duration of 60 seconds
    Starting Task: 2 with a duration of 7 seconds
    Task 2 intercepted at: 00:00:05.8459622
    Starting Task: 3 with a duration of 24 seconds
    Task 3 intercepted at: 00:00:07.2626124
    Starting Task: 4 with a duration of 15 seconds
    Task 4 intercepted at: 00:00:09.2257285
    Starting Task: 5 with a duration of 28 seconds
    Task 5 intercepted at: 00:00:10.3058738
    Starting Task: 6 with a duration of 21 seconds
    Task 6 intercepted at: 00:00:10.6376981
    Starting Task: 7 with a duration of 11 seconds
    Task 7 intercepted at: 00:00:10.7507146
    Starting Task: 8 with a duration of 29 seconds
    Task 8 intercepted at: 00:00:11.7107754
    Task 2 found to be completed at: 00:00:12.8111589
    Starting Task: 9 with a duration of 21 seconds
    Task 9 intercepted at: 00:00:13.7883430
    Starting Task: 10 with a duration of 20 seconds
    Task 10 intercepted at: 00:00:14.6707959
    Task 7 found to be completed at: 00:00:21.6692276
    Task 4 found to be completed at: 00:00:24.2125638
    Task 3 found to be completed at: 00:00:31.2276640
    Task 6 found to be completed at: 00:00:31.5908324
    Task 10 found to be completed at: 00:00:34.5585143
    Task 9 found to be completed at: 00:00:34.7053864
    Task 5 found to be completed at: 00:00:38.2616534
    Task 8 found to be completed at: 00:00:40.6372696
    Task 1 found to be completed at: 00:01:00.0720695
    

    您可以看到,由于多线程工作的性质,行有点乱,但时间戳是准确的。

    更新:

    好吧,我很愚蠢,我刚刚邀请你加入一个反模式。 Using ContinueWith is dangerous,而且它过于复杂 - 引入 async/await 是为了让我们免于手动调度延续。 您可以使用 awaits 的操作来包装您的 Task&lt;int&gt; 并记录时间

    class Worker
    {
        private List<Task> _tasks = new List<Task>();
        private readonly Stopwatch _stopwatch = new Stopwatch();
    
        // Start the stopwatch in the constructor or in some kind of a StartProcessing method.
    
        void OnTaskAdded(Task<int> task)
        {
            var taskWithContinuation = ContinueWithLog(task);
            _tasks.Add(taskWithContinuation);
        }
    
        async Task OnEndAsync()
        {
            // We're finishing work and there will be no more tasks, it's safe to await them all now.
            await Task.WhenAll(_tasks);
        }
    
        private Task ContinueWithLog(Task<int> task)
        {
            var i = await source;
            Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
        }
    }
    

    将您的示例代码用于快速而简单的 PoC:

    class Program
    {
        static readonly List<Task<int>> taskList = new List<Task<int>>();
        static readonly Random rnd = new Random(1);
        static readonly Stopwatch sw = new Stopwatch();
    
        static async Task<int> RunTaskAsync(int taskID, int taskDuration)
        {
            await Task.Yield();
            Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
            await Task.Delay(taskDuration);  // mimic some work
            return taskID;
        }
        static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
        {
            // Add numTasks asyncronously to the taskList
            // First task is added Syncronously and then we yield the adds to a worker
    
            taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
            await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task
    
            // remaing task run's are Yielded to a worker thread
            for (int i = 2; i <= numTasks; i++)
            {
                await Task.Delay(rnd.Next(minDelay, maxDelay));
                taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
            }
        }
    
        public static async Task ContinueWithLog(Task<int> source)
        {
            var i = await source;
            Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
        }
    
        public static async Task Main()
        {
            sw.Start();
    
            // Start a Fire and Forget Task to create some running tasks
            var _ = AddTasksAsync(10, 1, 3000);
            var internalList = new List<Task>();
    
            // while there are tasks to complete use the main thread to process them as they comeplete
            var i = 0;
            while (i < 10)
            {
                while (taskList.Count <= i)
                {
                    // No new tasks, check again after a delay -- THIS IS VERY BAD!
                    await Task.Delay(100);
                }
                Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
                internalList.Add(ContinueWithLog(taskList[i]));
                ++i;
            }
            await Task.WhenAll(internalList);
        }
    }
    

    输出:

    Starting Task: 1 with a duration of 60 seconds
    Task 1 intercepted at: 00:00:00.0525006
    Starting Task: 2 with a duration of 7 seconds
    Task 2 intercepted at: 00:00:05.8551382
    Starting Task: 3 with a duration of 24 seconds
    Task 3 intercepted at: 00:00:07.2687049
    Starting Task: 4 with a duration of 15 seconds
    Task 4 intercepted at: 00:00:09.2404507
    Starting Task: 5 with a duration of 28 seconds
    Task 5 intercepted at: 00:00:10.3325019
    Starting Task: 6 with a duration of 21 seconds
    Task 6 intercepted at: 00:00:10.6654663
    Starting Task: 7 with a duration of 11 seconds
    Task 7 intercepted at: 00:00:10.7809841
    Starting Task: 8 with a duration of 29 seconds
    Task 8 intercepted at: 00:00:11.7576237
    Task 2 found to be completed at: 00:00:12.8151955
    Starting Task: 9 with a duration of 21 seconds
    Task 9 intercepted at: 00:00:13.7228579
    Starting Task: 10 with a duration of 20 seconds
    Task 10 intercepted at: 00:00:14.5829039
    Task 7 found to be completed at: 00:00:21.6848699
    Task 4 found to be completed at: 00:00:24.2089671
    Task 3 found to be completed at: 00:00:31.2300136
    Task 6 found to be completed at: 00:00:31.5847257
    Task 10 found to be completed at: 00:00:34.5550722
    Task 9 found to be completed at: 00:00:34.6904076
    Task 5 found to be completed at: 00:00:38.2835777
    Task 8 found to be completed at: 00:00:40.6445029
    Task 1 found to be completed at: 00:01:00.0826952
    

    这是实现您想要的目标的惯用方式。很抱歉首先用ContinueWith 误导了你,这是不必要的,而且容易出错,现在我们都知道了。

    【讨论】:

    • 回复:您不想真正等到任何任务完成,而是希望每个任务在完成时执行一些操作。对,那是正确的。用例是在每个完成或抛出异常时报告,并决定是否需要重新启动它......我喜欢 ContinueWith 的想法,我将对此进行调查。谢谢你的例子。
    • @RowanSmith 在对async/await 的工作方式进行了更多调查之后,我更新了我的答案,之前的答案是错误的。
    • 感谢我在调查 ContinueWith 时也阅读了该博客并得出相同的结论。
    【解决方案2】:

    List&lt;Task&gt; 不适合此类作业的容器,因为它不支持完成 的概念。因此您将无法确定列表中是否还有更多任务要添加,因此您可以停止等待。不过有多种选择。

    1. BlockingCollection&lt;Task&gt;。生产者调用方法Add,最后调用CompleteAdding,表示已完成添加任务。消费者只是枚举GetConsumingEnumerable。非常简单,但本质上是阻塞的(不是异步的)。
    2. BufferBlock&lt;Task&gt;。生产者调用方法SendAsync,最后调用Complete,表示已完成添加任务。消费者使用OutputAvailableAsyncTryReceive 方法异步枚举。需要包 TPL Dataflow(对于 .NET Framework,它包含在 .NET Core 中)。
    3. Channel&lt;Task&gt;。生产者调用方法Writer.WriteAsync,最后调用Writer.Complete,表示已完成添加任务。消费者使用Reader.WaitToReadAsyncReader.TryRead 方法异步枚举。需要包 System.Threading.Channels(对于 .NET Framework,它包含在 .NET Core 中)。
    4. IObservable&lt;Task&gt; + IObserver&lt;Task&gt; 对。观察者订阅可观察对象,然后开始接收有关新任务的通知。最后一个通知是onCompleted(),表示将不再生成通知。 Reactive Extensions 库包含一系列用于操作可观察对象的方法,其中之一是 Merge 运算符,可用于等待所有任务,利用 Task&lt;T&gt; 可以转换为 @987654351 的事实@ 产生一个单一的onNext 通知。这种方法可能看起来很古怪,并且可能不值得投资学习这项技术(反应式编程范式),除非您经常处理想要过滤、转换、组合等的传入数据流。

    更新:回想起来,前三个选项不能按原样使用,因为您还想等待任务。所以我现在的建议是使用TransformBlock&lt;Task, Task&gt; 而不是BufferBlock&lt;Task&gt;

    var block = new TransformBlock<Task, Task>(async task =>
    {
        try
        {
            await task;
        }
        catch { } // suppress exceptions
        return task;
    });
    

    将任务添加到块的生产者示例:

    var producer = Task.Run(async () =>
    {
        for (int i = 1; i <= 10; i++)
        {
            await Task.Delay(100);
            Console.WriteLine($"Sending {i}");
            await block.SendAsync(Task.Delay(i * 100));
        }
        block.Complete();
    });
    

    从区块接收已完成任务的消费者示例:

    var consumer = Task.Run(async () =>
    {
        while (await block.OutputAvailableAsync())
        {
            while (block.TryReceive(out var task))
            {
                Console.WriteLine($"Task Completed: {task.Status}");
            }
        }
    });
    

    任务的接收顺序与它们添加到块中的顺序相同。如果您想在它们完成后立即接收它们,请像这样配置块:

    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Int32.MaxValue,
        EnsureOrdered = false
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2016-04-17
      • 1970-01-01
      • 1970-01-01
      • 2016-06-08
      • 1970-01-01
      • 2012-06-21
      相关资源
      最近更新 更多