您显示的代码存在问题,即它在工作人员和任务创建者之间没有合理的通信管道。您需要某种消息传递机制来通知工作人员新任务(以及没有更多任务时),以便它可以对其做出反应。这是你必须为你的并发系统弄清楚的事情,确切的实现与问题无关,所以我假设我们的工作线程中有 OnTaskAdded(Task task) 和 OnEnd() 方法。
根据您的说法,您不想真正等到任何任务完成,而是希望每个任务在完成时执行某些操作。请参阅下面的更新答案。
这可以通过ContinueWith 实现:
class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = task.ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, _stopwatch.Elapsed));
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
}
编辑:
在所有关于确保合理的消息传递管道的道德讨论之后,我认为我实际上可以给你一个快速而肮脏的实现,这样你就可以看到它的工作原理:
// DISCLAIMER: NOT PRODUCTION CODE!!!
public static async Task Main()
{
Stopwatch sw = new Stopwatch(); sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
var taskWithContinuation = taskList[i].ContinueWith(t =>
Console.WriteLine("Task {0} found to be completed at: {1}", t.Result, sw.Elapsed));
internalList.Add(taskWithContinuation);
++i;
}
await Task.WhenAll(internalList);
}
让我再次强调一下:这不是生产质量的代码!积极等待更多的任务,呃。它的输出是这样的:
Task 1 intercepted at: 00:00:00.0495570
Starting Task: 1 with a duration of 60 seconds
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8459622
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2626124
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2257285
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3058738
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6376981
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7507146
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7107754
Task 2 found to be completed at: 00:00:12.8111589
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7883430
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.6707959
Task 7 found to be completed at: 00:00:21.6692276
Task 4 found to be completed at: 00:00:24.2125638
Task 3 found to be completed at: 00:00:31.2276640
Task 6 found to be completed at: 00:00:31.5908324
Task 10 found to be completed at: 00:00:34.5585143
Task 9 found to be completed at: 00:00:34.7053864
Task 5 found to be completed at: 00:00:38.2616534
Task 8 found to be completed at: 00:00:40.6372696
Task 1 found to be completed at: 00:01:00.0720695
您可以看到,由于多线程工作的性质,行有点乱,但时间戳是准确的。
更新:
好吧,我很愚蠢,我刚刚邀请你加入一个反模式。 Using ContinueWith is dangerous,而且它过于复杂 - 引入 async/await 是为了让我们免于手动调度延续。 您可以使用 awaits 的操作来包装您的 Task<int> 并记录时间。
class Worker
{
private List<Task> _tasks = new List<Task>();
private readonly Stopwatch _stopwatch = new Stopwatch();
// Start the stopwatch in the constructor or in some kind of a StartProcessing method.
void OnTaskAdded(Task<int> task)
{
var taskWithContinuation = ContinueWithLog(task);
_tasks.Add(taskWithContinuation);
}
async Task OnEndAsync()
{
// We're finishing work and there will be no more tasks, it's safe to await them all now.
await Task.WhenAll(_tasks);
}
private Task ContinueWithLog(Task<int> task)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
}
将您的示例代码用于快速而简单的 PoC:
class Program
{
static readonly List<Task<int>> taskList = new List<Task<int>>();
static readonly Random rnd = new Random(1);
static readonly Stopwatch sw = new Stopwatch();
static async Task<int> RunTaskAsync(int taskID, int taskDuration)
{
await Task.Yield();
Console.WriteLine("Starting Task: {0} with a duration of {1} seconds", taskID, taskDuration / 1000);
await Task.Delay(taskDuration); // mimic some work
return taskID;
}
static async Task AddTasksAsync(int numTasks, int minDelay, int maxDelay)
{
// Add numTasks asyncronously to the taskList
// First task is added Syncronously and then we yield the adds to a worker
taskList.Add(RunTaskAsync(1, 60000)); // Make the first task run for 60 seconds
await Task.Delay(5000); // wait 5 seconds to ensure that the WhenAny is started with One task
// remaing task run's are Yielded to a worker thread
for (int i = 2; i <= numTasks; i++)
{
await Task.Delay(rnd.Next(minDelay, maxDelay));
taskList.Add(RunTaskAsync(i, rnd.Next(5, 30) * 1000));
}
}
public static async Task ContinueWithLog(Task<int> source)
{
var i = await source;
Console.WriteLine("Task {0} found to be completed at: {1}", i, sw.Elapsed);
}
public static async Task Main()
{
sw.Start();
// Start a Fire and Forget Task to create some running tasks
var _ = AddTasksAsync(10, 1, 3000);
var internalList = new List<Task>();
// while there are tasks to complete use the main thread to process them as they comeplete
var i = 0;
while (i < 10)
{
while (taskList.Count <= i)
{
// No new tasks, check again after a delay -- THIS IS VERY BAD!
await Task.Delay(100);
}
Console.WriteLine("Task {0} intercepted at: {1}", i + 1, sw.Elapsed);
internalList.Add(ContinueWithLog(taskList[i]));
++i;
}
await Task.WhenAll(internalList);
}
}
输出:
Starting Task: 1 with a duration of 60 seconds
Task 1 intercepted at: 00:00:00.0525006
Starting Task: 2 with a duration of 7 seconds
Task 2 intercepted at: 00:00:05.8551382
Starting Task: 3 with a duration of 24 seconds
Task 3 intercepted at: 00:00:07.2687049
Starting Task: 4 with a duration of 15 seconds
Task 4 intercepted at: 00:00:09.2404507
Starting Task: 5 with a duration of 28 seconds
Task 5 intercepted at: 00:00:10.3325019
Starting Task: 6 with a duration of 21 seconds
Task 6 intercepted at: 00:00:10.6654663
Starting Task: 7 with a duration of 11 seconds
Task 7 intercepted at: 00:00:10.7809841
Starting Task: 8 with a duration of 29 seconds
Task 8 intercepted at: 00:00:11.7576237
Task 2 found to be completed at: 00:00:12.8151955
Starting Task: 9 with a duration of 21 seconds
Task 9 intercepted at: 00:00:13.7228579
Starting Task: 10 with a duration of 20 seconds
Task 10 intercepted at: 00:00:14.5829039
Task 7 found to be completed at: 00:00:21.6848699
Task 4 found to be completed at: 00:00:24.2089671
Task 3 found to be completed at: 00:00:31.2300136
Task 6 found to be completed at: 00:00:31.5847257
Task 10 found to be completed at: 00:00:34.5550722
Task 9 found to be completed at: 00:00:34.6904076
Task 5 found to be completed at: 00:00:38.2835777
Task 8 found to be completed at: 00:00:40.6445029
Task 1 found to be completed at: 00:01:00.0826952
这是实现您想要的目标的惯用方式。很抱歉首先用ContinueWith 误导了你,这是不必要的,而且容易出错,现在我们都知道了。