TL;DR;
我认为 TPL 数据流 (https://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx) 是一个更好的选择,但我会使用 TPL 回答
回答
要限制并发,你需要一个限制并发的调度器。我建议您查看https://msdn.microsoft.com/library/system.threading.tasks.taskscheduler.aspx 并搜索LimitedConcurrencyLevelTaskScheduler。
下面的代码是您尝试实现的一个简单示例。
[TestClass]
public class UnitTest1
{
[TestMethod]
public async Task TestMethod1()
{
var factoryCancellation = new CancellationTokenSource();
var scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism: 7);
var taskFactory = new TaskFactory(factoryCancellation.Token, TaskCreationOptions.None, TaskContinuationOptions.None, scheduler);
var taskCancellation1 = new CancellationTokenSource();
var taskCancellation2 = new CancellationTokenSource();
var token1 = taskCancellation1.Token;
var token2 = taskCancellation2.Token;
var myTask1 = taskFactory.StartNew(async () => await Execute(0, token1), token1).Unwrap();
var myTask2 = taskFactory.StartNew(async () => await Execute(1, token2), token2).Unwrap();
taskCancellation1.CancelAfter(500);
try
{
await Task.WhenAll(myTask1, myTask2);
}
catch
{
//ThrowIfCancellationRequested Exception
}
}
private async Task Execute(int i, CancellationToken token)
{
Console.WriteLine($"Running Task {i} : Before Delay 1");
await Task.Delay(1000);
token.ThrowIfCancellationRequested();
Console.WriteLine($"Running Task {i} : Before Delay 2");
await Task.Delay(1000);
token.ThrowIfCancellationRequested();
Console.WriteLine($"Running Task {i} : Before Delay 3");
await Task.Delay(1000);
token.ThrowIfCancellationRequested();
}
}
这将生成以下日志
QueueTask 1 WaitingToRun
QueueTask 2 WaitingToRun
TryExecuteTask Start 1 WaitingToRun CreationOptions None
TryExecuteTask Start 2 WaitingToRun CreationOptions None
Running Task 1 : Before Delay 1
Running Task 0 : Before Delay 1
TryExecuteTask End 2 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 1 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 5 taskWasPreviouslyQueued False
TryExecuteTaskInline End 5 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 5 System.Action WaitingToRun
TryExecuteTask Start 5 System.Action WaitingToRun CreationOptions None
Running Task 1 : Before Delay 2
TryExecuteTask End 5 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 6 taskWasPreviouslyQueued False
TryExecuteTaskInline End 6 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 6 System.Action WaitingToRun
TryExecuteTask Start 6 System.Action WaitingToRun CreationOptions None
TryExecuteTaskInline Start 8 taskWasPreviouslyQueued False
TryExecuteTaskInline End 8 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 6 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 12 taskWasPreviouslyQueued False
TryExecuteTaskInline End 12 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 12 System.Action WaitingToRun
TryExecuteTask Start 12 System.Action WaitingToRun CreationOptions None
Running Task 1 : Before Delay 3
TryExecuteTask End 12 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 14 taskWasPreviouslyQueued False
TryExecuteTaskInline End 14 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 14 System.Action WaitingToRun
TryExecuteTask Start 14 System.Action WaitingToRun CreationOptions None
TryExecuteTaskInline Start 16 taskWasPreviouslyQueued False
TryExecuteTaskInline End 16 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 14 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
可以看到,Task 0 被尽快取消,Task 1 继续处理。此示例未显示但绝不会超过 7 个并发任务。
不幸的是,TAP 模式不适用于调度程序的 AttachToParent 选项,或者此代码可能更干净。见:TaskCreationOptions.AttachedToParent is not waiting for child task
要管理 CancellationTokens,您可以创建一个特定的 TaskFactory,它允许这样的事情:
taskFactory.StartNew(0 , () => {...});
taskFactory.Cancel(0);
TaskFactory 方法都不是虚拟的,因此您必须创建重载方法。