【问题标题】:Cancel a TPL task from within a TaskScheduler从 TaskScheduler 中取消 TPL 任务
【发布时间】:2015-06-16 20:04:19
【问题描述】:

我正在尝试创建一个TaskScheduler,它按顺序运行所有任务,但只会“完成”最近安排的任务。例如,如果我使用它来调度任务 A,那么在它完成调度任务 B 和 C 之前,我希望只有 C 被认为是成功的。 A 可以继续其工作,但在完成时应被视为“已取消”,而 B 应在开始之前标记为已取消。

我已经有了在线程池上按顺序执行委托的现有代码,并管理最多有 2 个排队任务的想法 - 一个当前正在执行,另一个正在执行。缺少的部分是能够将任务的结果状态设置为取消

不幸的是,在TaskScheduler 中,您实际上几乎无法访问Task 或任何CancellationToken 的状态。

我试图通过跟踪最后排队的任务来解决这个问题,并且在执行任务时,如果它不等于最后排队的任务,则抛出 TaskCancelledException,但这不t 似乎工作。我猜这是因为任务的委托内部没有抛出异常,所有的“魔法”实际上都是在TryExecuteTask()内部处理的。

这是我得到的:

public class CurrentPendingTaskScheduler : TaskScheduler
        {
        private readonly ThreadSafeCurrentPendingQueueProcessor<Task> _Processor;
        private Task _LastTask;

        public CurrentPendingTaskScheduler()
            {
            _Processor = new ThreadSafeCurrentPendingQueueProcessor<Task>();
            _Processor.Process += _Processor_Process;
            }

        private void _Processor_Process(Task obj)
            {
            // If there's a newer task already, cancel this one before starting
            if (obj != _LastTask)
                throw new TaskCanceledException(obj);

            TryExecuteTask(obj);

            // If a newer task was added whilst we worked, cancel this one
            if (obj != _LastTask)
                throw new TaskCanceledException(obj);
            }

        protected override void QueueTask(Task task)
            {
            _LastTask = task;
            _Processor.Enqueue(task);
            }

        protected override Boolean TryExecuteTaskInline(Task task, Boolean taskWasPreviouslyQueued)
            {
            return false;
            }

        protected override IEnumerable<Task> GetScheduledTasks()
            {
            throw new NotImplementedException();
            }
        }

ThreadSafeCurrentPendingQueueProcessor&lt;&gt; 类是一个助手,它通过事件回调,以便在单个后台线程上处理排队的项目,只允许一个活动项目和一个待处理项目。

如果“最后一个任务”在处理器回调之前发生了变化,异常只会阻止任务运行(但不会影响其状态)。如果回调确实开始运行,但在此期间“最后一个任务”发生了变化,则异常抛出太晚了,在任何延续已经开始之后。

我也不确定这是为什么,但在第一次使用调度程序时(我为每个 UI 元素点击安排一个任务),QueueTask 被调用一次,新任务。但是对于每个后续调度,它会被调用两次。这会让事情变得更糟,因为_LastTask 会被覆盖。

我觉得TaskCompletionSource&lt;&gt; 可能有一些用处,但不太明白怎么用。

是否可以实现如所述工作的TaskScheduler?我知道我可以在调度程序之外实现这种行为,在我创建任务的时候,但我需要在很多地方使用它,并试图通过将它放在可重用的调度程序中来让生活更轻松。

【问题讨论】:

  • 我认为你做不到。 TaskScheduler 的工作是决定 Task 将在何时何地执行,而不是其结果。

标签: c# .net task-parallel-library


【解决方案1】:

我会创建一个辅助类,它接受一个输入操作,启动它,取消现有的,并覆盖它的内部变量。由于您不能直接在Task&lt;T&gt; 上直接执行Cancel(),因此您需要将自己的TaskCancellationSource 放在手边。如果你想提供一个外部令牌,你可以将它们与CancellationTokenSource.CreateLinkedTokenSource(...)结合起来。如果你需要关注结果,那将为TaskCompletionSource提供一个很好的机会。

public class OverwriteTaskHandler<T>
{
    private Task<T> _task;
    private TaskCompletionSource<T> _tcs;
    private CancellationTokenSource _cts;

    public OverwriteTaskHandler(Func<T> operation)
    {
        _tcs = new TaskCompletionSource<T>();
        _cts = new CancellationTokenSource();
        TryPushTask(operation);
    }

    public bool TryPushTask(Func<T> operation)
    {
        if (_tcs.Task.IsCompleted)
            return false;   //It would be unsafe to use this instance as it is already "finished"
        _cts.Cancel();
        _cts = new CancellationTokenSource();
        _task = Task.Run(operation, _cts.Token);
        _task.ContinueWith(task => _tcs.SetResult(task.Result));
        return true;
    }

    public void Cancel()
    {
        _cts.Cancel();
    }

    public Task<T> WrappedTask { get { return _tcs.Task; } }
}

Discalimer:我没有测试过,所以请仔细检查!

【讨论】:

    【解决方案2】:

    AFAIK TaskScheduler 不能用来完成这项工作。你真的需要别的东西。例如,您可以为自己编写一个具有以下用法的辅助类:

    static CurrentPendingTaskContext ctx = ...;
    
    async Task MyAsyncFunc() {
     await ctx.RegisterAndMaybeCancel();
     try {
      //rest of method
     }
     finally {
      ctx.NotifyCompletion();
     }
    }
    

    RegisterAndMaybeCancel 会等到当前运行的任务完成。如果此特定任务已被另一个任务取代,它将引发取消异常。

    我现在没有时间来实现那个类(虽然它很诱人)。但我认为这种语法模式很简单,你可以在很多地方使用它。

    您也可以使用IDisposable 模式来摆脱finally:

    async Task MyAsyncFunc() {
     using (await ctx.RegisterAndMaybeCancel())
     {
          //rest of method
     }
    }
    

    【讨论】: