【问题标题】:How to abort or terminate a task of TPL when cancellation token is unreachable?当取消令牌无法访问时,如何中止或终止 TPL 的任务?
【发布时间】:2014-04-03 17:20:57
【问题描述】:

让我们考虑方法:

Task Foo(IEnumerable items, CancellationToken token)
{
    return Task.Run(() =>
    {
        foreach (var i in items)
            token.ThrowIfCancellationRequested();

    }, token);
}

那么我有一个消费者:

var cts = new CancellationTokenSource();
var task = Foo(Items, cts.token);
task.Wait();

还有Items的例子:

IEnumerable Items
{
    get
    {
        yield return 0;
        Task.Delay(Timeout.InfiniteTimeSpan).Wait();
        yield return 1;
    }
}

task.Wait 呢? 我无法将我的取消令牌放入项目集合中

如何杀死没有响应的任务或解决这个问题?

【问题讨论】:

  • 为什么不能将令牌传递给 Items()?
  • @DaxFohl 我把它们作为论据。在我的问题中,这些项目只是对外面发生的事情的可视化。
  • 你能把 cts 设为成员变量并从Items 那样使用它吗?
  • @DaxFohl:不幸的是没有。我不能对项目做任何事情。只需遍历它们。
  • 您无法取消不可取消的操作,但您可以停止等待。您想要什么,停止等待或取消 Items 的迭代?

标签: c# task-parallel-library cancellation-token


【解决方案1】:

我找到了一种允许将取消令牌放入来自第三方的项目的解决方案:

public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token)
{
    var enumerator = @this.GetEnumerator();

    for (; ; )
    {
        var task = Task.Run(() => enumerator.MoveNext(), token);
        task.Wait(token);

        if (!task.Result)
            yield break;

        yield return enumerator.Current;
    }
}

现在我需要使用:

Items.ToCancellable(cts.token)

并且在取消请求后不会挂起。

【讨论】:

  • 这似乎是一个很好的解决方案。我测试了它,它似乎工作得很好。
  • 我同意,它比我的任何一个都更灵活。请注意,它仍然会使任务保持打开状态(传递令牌 task.Wait 实际上并不会终止任务;它只是取消等待)。要解决这个问题,您需要Thread.Abort
  • @DaxFohl 你是对的。任务仍在运行。最重要的是,由于先前的取消异常,它对 ToCancellable 线程没有更多影响。即使我们将无限等待替换为某个较小的值,也只会执行下一个收益返回,其结果将被忽略。
【解决方案2】:

您无法真正取消不可取消的操作。 Stephen Toub 在 Parallel FX 团队的博客上的“How do I cancel non-cancelable async operations?”中进行了详细介绍,但本质是您需要了解您真正想要做什么?

  1. 停止异步/长时间运行的操作本身?如果您无法发出操作信号,则无法以合作方式进行操作
  2. 停止等待操作完成,忽略任何结果?这是可行的,但由于显而易见的原因可能导致不可靠性。您可以使用传递取消令牌的长操作启动任务,或使用 Stephen Toub 描述的 TaskCompletionSource。

您需要确定要找到正确解决方案的行为

【讨论】:

    【解决方案3】:

    为什么不能将 CancellationToken 传递给Items()

    IEnumerable Items(CancellationToken ct)
    {
        yield return 0;
        Task.Delay(Timeout.InfiniteTimeSpan, ct).Wait();
        yield return 1;
    }
    

    当然,您必须将相同的令牌传递给 Items(),就像传递给 Foo() 一样。

    【讨论】:

      【解决方案4】:

      尝试使用 TaskCompletionSource 并返回它。然后,您可以将TaskCompletionSource 设置为内部任务运行完成(或错误)的结果(或错误)。但如果CancellationToken 被触发,您可以将其设置为立即取消。

      Task<int> Foo(IEnumerable<int> items, CancellationToken token)
      {
          var tcs = new TaskCompletionSource<int>();
          token.Register(() => tcs.TrySetCanceled());
          var innerTask = Task.Factory.StartNew(() =>
          {
              foreach (var i in items)
                  token.ThrowIfCancellationRequested();
              return 7;
          }, token);
          innerTask.ContinueWith(task => tcs.TrySetResult(task.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
          innerTask.ContinueWith(task => tcs.TrySetException(task.Exception), TaskContinuationOptions.OnlyOnFaulted);
          return tcs.Task;
      }
      

      这实际上不会杀死内部任务,但它会给你一个任务,你可以在取消后立即继续。要杀死内部任务,因为它在无限超时中挂出,我相信你唯一能做的就是在你开始任务的地方获取对Thread.CurrentThread 的引用,然后从Foo 中调用taskThread.Abort(),这当然是不好的做法。但在这种情况下,您的问题实际上归结为“如何在无法访问代码的情况下终止长时间运行的函数”,这只能通过Thread.Abort 实现。

      【讨论】:

        【解决方案5】:

        您可以将项目设置为 IEnumerable&lt;Task&lt;int&gt;&gt; 而不是 IEnumerable&lt;int&gt; 吗?那你就可以了

        return Task.Run(() =>
        {
            foreach (var task in tasks)
            {
                task.Wait(token);
                token.ThrowIfCancellationRequested();
                var i = task.Result;
            }
        }, token);
        

        虽然这样的事情使用 Reactive Framework 和 items.ToObservable 可能更直接。看起来像这样:

        static Task<int> Foo(IEnumerable<int> items, CancellationToken token)
        {
            var sum = 0;
            var tcs = new TaskCompletionSource<int>();
            var obs = items.ToObservable(ThreadPoolScheduler.Instance);
            token.Register(() => tcs.TrySetCanceled());
            obs.Subscribe(i => sum += i, tcs.SetException, () => tcs.TrySetResult(sum), token);
            return tcs.Task;
        }
        

        【讨论】:

          【解决方案6】:

          如何围绕可在项目之间取消的枚举创建一个包装器?

          IEnumerable<T> CancellableEnum<T>(IEnumerable<T> items, CancellationToken ct) {
              foreach (var item in items) {
                  ct.ThrowIfCancellationRequested();
                  yield return item;
              }
          }
          

          ...尽管这似乎是 Foo() 已经在做的事情。如果你在某个地方这个可枚举的块实际上是无限的(而且它不仅非常慢),那么你要做的就是在消费者端的 task.Wait() 中添加超时和/或取消令牌。

          【讨论】:

          • ...因此是“如果它没有无限地阻塞”部分。 ;)
          【解决方案7】:

          我的previous solution 是基于一个乐观的假设,即枚举可能不会挂起并且速度非常快。因此我们有时可以牺牲系统线程池中的一个线程?正如Dax Fohl 指出的那样,即使其父任务已被取消异常杀死,该任务仍将处于活动状态。在这方面,如果多个集合被无限期冻结,这可能会阻塞默认任务调度程序使用的底层 ThreadPool。

          因此我重构了 ToCancellable 方法:

          public static IEnumerable<T> ToCancellable<T>(this IEnumerable<T> @this, CancellationToken token)
          {
              var enumerator = @this.GetEnumerator();
              var state = new State();
          
              for (; ; )
              {
                  token.ThrowIfCancellationRequested();
          
                  var thread = new Thread(s => { ((State)s).Result = enumerator.MoveNext(); }) { IsBackground = true, Priority = ThreadPriority.Lowest };
                  thread.Start(state);
          
                  try
                  {
                      while (!thread.Join(10))
                          token.ThrowIfCancellationRequested();
                  }
                  catch (OperationCanceledException)
                  {
                      thread.Abort();
                      throw;
                  }
          
                  if (!state.Result)
                      yield break;
          
                  yield return enumerator.Current;
              }
          }
          

          还有一个帮助类来管理结果:

          class State
          {
              public bool Result { get; set; }
          }
          

          中止分离的线程是安全的。

          我在这里看到的痛苦是线程创建很重。这可以通过使用自定义线程池以及生产者-消费者模式来解决,该模式将能够处理中止异常,以便从池中删除损坏的线程。

          另一个问题是在加入线。这里最好的停顿是什么?也许这应该由用户负责并作为方法参数发送。

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 1970-01-01
            • 2019-09-10
            • 1970-01-01
            • 1970-01-01
            • 2017-08-12
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多