【问题标题】:How to chain async tasks with a CancellationToken?如何使用 CancellationToken 链接异步任务?
【发布时间】:2019-10-18 18:27:52
【问题描述】:

如果我的CancellationToken 没有被解雇,我想链接一些任务,但有条件地继续执行。

我想要达到的目标相当于

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
    if (cancellationToken.IsCancellationRequested) return;
    await t1();
    if (cancellationToken.IsCancellationRequested) return;
    await t2();
    if (cancellationToken.IsCancellationRequested) return;
    await t3();
    if (cancellationToken.IsCancellationRequested) return;
    await t4();
});
var timeout = Task.Delay(TimeSpan.FromSeconds(4));
var completedTask = await Task.WhenAny(t, timeout);
if (completedTask != t)
{
    cts.Cancel();
    await t;
}

这就是我现在所拥有的,它正在工作,虽然它也很冗长。

【问题讨论】:

    标签: c# asynchronous task


    【解决方案1】:
    var cts = new CancellationTokenSource();
    var t = Task.Run(async () => {
         await t1();
         await t2();
         await t3();
         await t4();
     }, cts.Token);
    
    cts.CancelAfter(TimeSpan.FromSeconds(4));
    
    try
    {
         await t;
    }
    catch (OperationCanceledException)
    {
         // The cancellation token was triggered, add logic for that
         ....
    }
    

    【讨论】:

    • CancellationToken 作为Task.Run 的参数传递具有取消Task 的效果,前提是它仍处于计划中且尚未开始。因此,您的代码将允许所有任务运行,或者都不允许运行。这不是 OP 想要的。
    • 对不起朋友,这不符合我的要求。
    【解决方案2】:

    您的原始代码是正确的——它假定您始终希望单个任务运行完成,并且如果取消,您希望整个任务成功完成。这些东西都不是惯用的。

    更正常的做法是:

    var cts = new CancellationTokenSource();
    var cancellationToken = cts.Token;
    var t = Task.Run(async () => {
        cancellationToken.ThrowIfCancellationRequested();
        await t1(cancellationToken);
        cancellationToken.ThrowIfCancellationRequested();
        await t2(cancellationToken);
        cancellationToken.ThrowIfCancellationRequested();
        await t3(cancellationToken);
        cancellationToken.ThrowIfCancellationRequested();
        await t4(cancellationToken);
    }, cancellationToken);
    

    然后,在其他地方:

    cts.Cancel();
    

    您可以在此处省略对ThrowIfCancellationRequested 的调用,假设各个任务在进入后很快就会对其进行检查,但核心思想是您应该将令牌传递到正在执行工作的最内层循环,并且他们应该通过调用它来引发取消,这最终将任务设置为取消状态而不是成功状态。

    (所以实际上你只需要实际上调用ThrowIfCancellationRequested,如果你遇到一个不接受CancellationToken参数的函数——这就是所有异步方法都应该这样做的原因,因为否则它的任务将是不可取消的。)

    【讨论】:

    • 您的解决方案可能会向调用者传播一个令人惊讶的OperationCancelledException。仅当调用者是取消的发起者时,此异常才适用。这不是这里的情况。取消是由于操作的内部运作而发生的。传播发生超时的信息的正确方法是抛出 TimeoutException 恕我直言。我同意将令牌传递给各个异步方法。
    • @TheodorZoulias 是的,如果保留基于时间的取消,那么应该包装整个事情并将其转换为 TimeoutException。但是,从原始问题的上下文来看,超时并不是实际取消的预期方式,它只是用于示例目的。我只是用那个例子滚动。
    • 在 OP 的示例中没有抛出异常。在超时的情况下,整个任务将成功完成。您声称这是非惯用语,这是我不同意您的回答的另一点(我认为这是惯用语和不好的做法)。
    • 这就是为什么我说它应该通过取消来完成整个任务,我们都同意这比成功更好(这是当前接受的答案)。同样,我假设“超时取消”的具体示例不是最终的预期取消方法。因此,无论取消了任务,任务确实被取消了,这应该通知任何观察任务的人——因此不足为奇。
    • 我同意取消胜于成功,就像受伤胜于死亡一样。两者都是不好的做法。没有人传达真正发生的事情,即:超时
    【解决方案3】:

    您的目标似乎是在整个操作花费超过 4 秒时停止执行。

    如果您将CancellationToken 传递给您的t1/t2/等。方法,我想说你不能做得比你所拥有的更好。但正如你所拥有的,你可以只使用Stopwatch 而不是CancellationToken

    var timeout = TimeSpan.FromSeconds(4);
    var stopwatch = new Stopwatch();
    stopwatch.Start();
    
    await t1();
    if (stopwatch.Elapsed > timeout) return;
    await t2();
    if (stopwatch.Elapsed > timeout) return;
    await t3();
    if (stopwatch.Elapsed > timeout) return;
    await t4();
    stopwatch.Stop();
    

    我假设这是在某个方法中,您可以在其中使用return,但您可以根据需要进行修改(返回值、抛出异常等)。

    【讨论】:

    • 在这种情况下,我必须使用CancellationToken
    • @TiagoDall'Oca 为什么?这是否会从需要传递CancellationToken 的其他代码中调用?
    • 是的。这是我们在开发一些启用取消的功能时可能会采用的一些模式。这里的超时实际上只是一个特定的用例。
    【解决方案4】:

    您的代码在功能上是正常的,但是一目了然地阅读它并不清楚它在做什么。所以我建议你将这个逻辑封装在一个带有描述性名称和参数的实用方法中:

    public static async Task RunSequentially(IEnumerable<Func<Task>> taskFactories,
        int timeout = Timeout.Infinite, bool onTimeoutAwaitIncompleteTask = false)
    {
        using (var cts = new CancellationTokenSource(timeout))
        {
            if (onTimeoutAwaitIncompleteTask)
            {
                await Task.Run(async () =>
                {
                    foreach (var taskFactory in taskFactories)
                    {
                        if (cts.IsCancellationRequested) throw new TimeoutException();
                        await taskFactory();
                    }
                });
            }
            else // On timeout return immediately
            {
                var allSequentially = Task.Run(async () =>
                {
                    foreach (var taskFactory in taskFactories)
                    {
                        cts.Token.ThrowIfCancellationRequested();
                        var task = taskFactory(); // Synchronous part of task
                        cts.Token.ThrowIfCancellationRequested();
                        await task; // Asynchronous part of task
                    }
                }, cts.Token);
                var timeoutTask = new Task(() => {}, cts.Token);
                var completedTask = await Task.WhenAny(allSequentially, timeoutTask);
                if (completedTask.IsCanceled) throw new TimeoutException();
                await completedTask; // Propagate any exception
            }
        }
    }
    

    此代码与您的代码不同,因为它会在超时时抛出 TimeoutException。我认为最好强制调用者明确处理这个异常,而不是隐藏操作超时的事实。调用者可以通过将 catch 块留空来忽略异常:

    try
    {
        await RunSequentially(new[] { t1, t2, t3, t4 },
            timeout: 4000,
            onTimeoutAwaitIncompleteTask: true);
    }
    catch (TimeoutException)
    {
        // Do nothing
    }
    

    【讨论】:

    • 你在这里所做的似乎有点矫枉过正,但我​​喜欢将任务放在集合中的想法。
    • @TiagoDall'Oca 我没有使用集合,因为每个Task 都是单独等待的,并且没有理由跟踪已完成的Tasks。我可以使用一个集合来保存Tasks 的results,如果我不得不使用Task&lt;TResult&gt;s,将这些结果返回给调用者。在 C# 8 中,这甚至不是必需的,因为我可以返回一个 IAsyncEnumerable&lt;TResult&gt;,并将结果一一发送给调用者。
    【解决方案5】:

    您应该考虑使用 Microsoft 的响应式框架(又名 Rx) - NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

    IObservable<Unit> tasks =
        from x1 in Observable.FromAsync(() => t1())
        from x2 in Observable.FromAsync(() => t2())
        from x3 in Observable.FromAsync(() => t3())
        from x4 in Observable.FromAsync(() => t4())
        select Unit.Default;
    
    IObservable<Unit> timer =
        Observable
            .Timer(TimeSpan.FromSeconds(4.0))
            .Select(x => Unit.Default)
    
    IDisposable subscription =
        Observable
            .Amb(tasks, timer)
            .Subscribe();
    

    如果计时器 observable 在任务完成之前触发,则整个管道将被取消。不会运行不必要的任务。

    如果您想手动取消,只需致电subscription.Dispose()

    代码简洁美观。

    【讨论】:

    • 是的,它对我来说看起来不错,但我真的在寻找一种在我的具体情况下使用 CancellationToken 的方法。非常简洁的代码!谢谢
    • @TiagoDall'Oca - 为什么是CancellationToken?您在问题中显示的任务中没有任何内容可以取消。在任何情况下,都可以将 Rx 与CancellationToken 一起使用。我需要更多地了解你在做什么。
    【解决方案6】:

    由于这里没有人给出更好的答案,我会自己做。

    在我看来,我在问题中提供的代码最有凝聚力,也足够通用,可以在其他情况下回收。

    【讨论】:

      猜你喜欢
      • 2014-08-06
      • 1970-01-01
      • 2013-02-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-09-24
      相关资源
      最近更新 更多