【问题标题】:Cancel Specific Task using CancellationToken使用 CancellationToken 取消特定任务
【发布时间】:2016-03-05 15:47:06
【问题描述】:

我有以下循环,它可能会或可能不会动态创建一系列任务:

While(CreateNewTask == true)
{
 if(isWorkerFree() && isValidJob() && isExecutable())
 {
   CancellationTokenSource cs = new  CancellationTokenSource();
   var myTask = Task.Run(() => Execute(cs.token);
 }
}

现在由于这些任务是动态创建的,我如何跟踪它们并取消特定任务或向特定任务发送取消令牌?随时可能有 6-7 个任务在运行,我需要知道哪些任务正在运行并取消特定任务的功能。

【问题讨论】:

  • 您如何确定要取消的哪个?什么会将CreateNewTask 设置为false?
  • @JonSkeet while 循环正在查看队列,如果队列中有新任务,它将“CreateNewTask”设置为 true。我希望同时运行最多 7 个任务(工人)。对于队列中的每个项目,我们创建一个任务,并执行该任务。该任务可能需要几分钟到几小时才能完成。所以我想要取消正在运行的任务的能力。现在正如您概述的那样,我面临两个挑战:a)随时获取正在运行的任务列表,以及b)取消特定任务。我真的很适合你的帮助。
  • 您还没有说如何指定要取消哪个。听起来您确实应该查看 TPL 数据流:msdn.microsoft.com/en-us/library/hh228603%28v=vs.110%29.aspx
  • @JonSkeet 谢谢乔恩,理想情况下,管理员将通过服务发送请求以取消任务。
  • 这并不能真正回答我的问题 - 您正在开始任务,并且您想取消其中一项...您希望如何知道 哪个 任务取消?

标签: c# .net task


【解决方案1】:

您可以使用 DTO 跟踪每个此类任务:class Item { Task Task; CancellationTokenSource CTS; }。保留这些项目的清单。然后您可以随意取消它们。

【讨论】:

  • @user2972161 一个“数据传输对象”。一个只包含一些字段的愚蠢类。不确定您对 ActionBlock 的具体想法。
  • 我有一个最多可以同时运行 16 个任务的 ActionBlock,但我希望能够以某种方式附加一个取消令牌列表,以便可以取消这 16 个任务中的任何一个。 Action 块是我的 TPL Dataflow 管道的一部分。似乎没有我想做什么的例子,但我不能成为第一个想这样做的人。
  • @user2972161 您可以将 CancellationToken 与您要处理的数据一起传递到操作块中。然后,该 ActionBlock 中的代码需要遵守 CancellationToken 并自行取消。
【解决方案2】:

TL;DR;
我认为 TPL 数据流 (https://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx) 是一个更好的选择,但我会使用 TPL 回答

回答
要限制并发,你需要一个限制并发的调度器。我建议您查看https://msdn.microsoft.com/library/system.threading.tasks.taskscheduler.aspx 并搜索LimitedConcurrencyLevelTask​​Scheduler。

下面的代码是您尝试实现的一个简单示例。

[TestClass]
public class UnitTest1
{
    [TestMethod]
    public async Task TestMethod1()
    {
        var factoryCancellation = new CancellationTokenSource();
        var scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism: 7);
        var taskFactory = new TaskFactory(factoryCancellation.Token, TaskCreationOptions.None, TaskContinuationOptions.None, scheduler);

        var taskCancellation1 = new CancellationTokenSource();
        var taskCancellation2 = new CancellationTokenSource();
        var token1 = taskCancellation1.Token;
        var token2 = taskCancellation2.Token;

        var myTask1 = taskFactory.StartNew(async () => await Execute(0, token1), token1).Unwrap();
        var myTask2 = taskFactory.StartNew(async () => await Execute(1, token2), token2).Unwrap();

        taskCancellation1.CancelAfter(500);

        try
        {
            await Task.WhenAll(myTask1, myTask2);
        }
        catch
        {
            //ThrowIfCancellationRequested Exception
        }
    }

    private async Task Execute(int i, CancellationToken token)
    {
        Console.WriteLine($"Running Task {i} : Before Delay 1");
        await Task.Delay(1000);
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"Running Task {i} : Before Delay 2");
        await Task.Delay(1000);
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"Running Task {i} : Before Delay 3");
        await Task.Delay(1000);
        token.ThrowIfCancellationRequested();
    }
}

这将生成以下日志

QueueTask 1  WaitingToRun
QueueTask 2  WaitingToRun
TryExecuteTask Start 1  WaitingToRun CreationOptions None
TryExecuteTask Start 2  WaitingToRun CreationOptions None
Running Task 1 : Before Delay 1
Running Task 0 : Before Delay 1
TryExecuteTask End 2 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 1 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 5 taskWasPreviouslyQueued False
TryExecuteTaskInline End 5 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 5 System.Action WaitingToRun
TryExecuteTask Start 5 System.Action WaitingToRun CreationOptions None
Running Task 1 : Before Delay 2
TryExecuteTask End 5 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 6 taskWasPreviouslyQueued False
TryExecuteTaskInline End 6 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 6 System.Action WaitingToRun
TryExecuteTask Start 6 System.Action WaitingToRun CreationOptions None
TryExecuteTaskInline Start 8 taskWasPreviouslyQueued False
TryExecuteTaskInline End 8 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 6 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 12 taskWasPreviouslyQueued False
TryExecuteTaskInline End 12 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 12 System.Action WaitingToRun
TryExecuteTask Start 12 System.Action WaitingToRun CreationOptions None
Running Task 1 : Before Delay 3
TryExecuteTask End 12 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 14 taskWasPreviouslyQueued False
TryExecuteTaskInline End 14 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 14 System.Action WaitingToRun
TryExecuteTask Start 14 System.Action WaitingToRun CreationOptions None
TryExecuteTaskInline Start 16 taskWasPreviouslyQueued False
TryExecuteTaskInline End 16 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 14 RanToCompletion IsCanceled False IsCompleted True IsFaulted False

可以看到,Task 0 被尽快取消,Task 1 继续处理。此示例未显示但绝不会超过 7 个并发任务。

不幸的是,TAP 模式不适用于调度程序的 AttachToParent 选项,或者此代码可能更干净。见:TaskCreationOptions.AttachedToParent is not waiting for child task

要管理 CancellationTokens,您可以创建一个特定的 TaskFactory,它允许这样的事情:

taskFactory.StartNew(0 , () => {...});
taskFactory.Cancel(0);

TaskFactory 方法都不是虚拟的,因此您必须创建重载方法。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-12-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-30
    • 1970-01-01
    相关资源
    最近更新 更多