【问题标题】:Identifying when long running async tasks have completed识别长时间运行的异步任务何时完成
【发布时间】:2019-04-09 18:31:10
【问题描述】:

[2018 年 4 月 18 日更新,使用 LinqPad 示例 - 见结尾]

我的应用程序收到一份工作列表:

var jobs = await myDB.GetWorkItems();

(注意:我们在任何地方都使用 .ConfigureAwait(false),我只是没有在这些伪代码 sn-ps 中显示它。)

对于每个作业,我们创建一个长时间运行的任务。但是,我们不想等待这个长时间运行的任务完成。

jobs.ForEach(job =>
{
    var module = Factory.GetModule(job.Type);
    var task = Task.Run(() => module.ExecuteAsync(job.Data));
    this.NonAwaitedTasks.Add(task, module);
};

任务及其相关模块实例都被添加到 ConcurrentDictionary 中,这样它们就不会超出范围。

在其他地方,我有另一个偶尔调用的方法,其中包含以下内容:

foreach (var entry in this.NonAwaitedTasks.Where(e => e.Key.IsCompleted))
{
    var module = entry.Value as IDisposable;
    module?.Dispose();
    this.NonAwaitedTasks.Remove(entry.Key);
}

(注意,NonAwaitedTasks 还使用 SemaphoreSlim 锁定...)

所以,这个想法是这个方法会找到所有已经完成的任务,然后处理它们的相关模块,并将它们从字典中删除。

但是....

在 Visual Studio 2017 中进行调试时,我从数据库中提取了一个作业,并且在我花时间在已实例化的单个模块中进行调试时,在该模块上调用了 Dispose。查看Callstack,我可以看到在上面的方法中已经调用了Dispose,那是因为任务有IsCompleted == true。但显然无法完成,因为我还在调试中。

  • .IsCompleted 属性是否是要检查的错误属性?
  • 这只是在 Visual Studio 中调试的产物吗?
  • 我是不是走错了路?

其他信息

在下面的 cmets 中,我被要求提供一些关于流程的额外信息,因为我所描述的似乎不可能(事实上,我希望它不可能)。下面是我的代码的精简版本(我已经删除了对取消标记和防御性编码的检查,但没有任何影响流程)。

应用程序入口点

这是一个 Windows 服务。在 OnStart() 中是以下行:

this.RunApplicationTask = 
Task.Run(() => myApp.DoWorkAsync().ConfigureAwait(false), myService.CancelSource.Token);

“RunApplicationTask”只是一个属性,用于在服务的生命周期内将执行任务保持在范围内。

DoWorkAsync()

public async Task DoWorkAsync()
{
    do
    {
        await this.ExecuteSingleIterationAsync().ConfigureAwait(false);
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
    }
    while (myApp.ServiceCancellationToken.IsCancellationRequested == false);

    await Task.WhenAll(this.NonAwaitedTasks.Keys).ConfigureAwait(false);
    await this.ClearCompletedTasksAsync().ConfigureAwait(false);
    this.WorkItemsTaskCompletionSource.SetResult(true);

    return;
}

所以当我在调试时,这是在迭代 DO-LOOP,它没有到达 Task.WhenAll(....)。

还要注意,在调用 Cancellation 请求并且所有任务都完成后,我会调用 ClearCompletedTasksAsync()。稍后会详细介绍....

ExecuteSingleIterationAsync

private async Task ExecuteSingleIterationAsync()
{
    var getJobsResponse = await DB.GetJobsAsync().ConfigureAwait(false);
    await this.ProcessWorkLoadAsync(getJobsResponse.Jobs).ConfigureAwait(false);
    await this.ClearCompletedTasksAsync().ConfigureAwait(false);
}

ProcessWorkLoadAsync

private async Task ProcessWorkLoadAsync(IList<Job> jobs)
{
    if (jobs.NoItems())
    {
        return ;
    }

    jobs.ForEach(job =>
    {
        // The processor instance is disposed of when removed from the NonAwaitedTasks collection.
        IJobProcessor processor = ProcessorFactory.GetProcessor(workItem, myApp.ServiceCancellationToken);
        try
        {
            var task = Task.Run(() => processor.ExecuteAsync(job).ConfigureAwait(false), myApp.ServiceCancellationToken);
            this.NonAwaitedTasks.Add(task, processor);
        }
        catch (Exception e)
        {
            ...
        }
    });

    return;
}

每个处理器都实现以下接口方法: Task ExecuteAsync(Job job);

当我在 ExecuteAsync 中时,.Dispose() 在我正在使用的处理器实例上被调用。

ProcessorFactory.GetProcessor()

public static IJobProcessor GetProcessor(Job job, CancellationToken token)
{
    .....
    switch (someParamCalculatedAbove)
    {
        case X:
            {
                return new XProcessor(...);
            }

        case Y:
            {
                return new YProcessor(...);
            }

        default:
            {
                return null;
            }
    }
}

所以我们在这里得到了一个新的实例。

ClearCompletedTasksAsync()

private async Task ClearCompletedTasksAsync()
{
    await myStatic.NonAwaitedTasksPadlock.WaitAsync().ConfigureAwait(false);
    try
    {
        foreach (var taskEntry in this.NonAwaitedTasks.Where(entry => entry.Key.IsCompleted).ToArray())
        {
            var processorInstance = taskEntry.Value as IDisposable;
            processorInstance?.Dispose();
            this.NonAwaitedTasks.Remove(taskEntry.Key);
        }
    }
    finally
    {
        myStatic.NonAwaitedTasksPadlock.Release();
    }
}

这被称为 Do-Loop 的每次迭代。其目的是确保非等待任务列表保持较小。

就是这样...... Dispose 似乎只在调试时被调用。

LinqPad 示例

async Task Main()
{
    SetProcessorRunning();

    await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);

    do
    {
        foreach (var entry in NonAwaitedTasks.Where(e => e.Key.IsCompleted).ToArray())
        {
            "Task is completed, so will dispose of the Task's processor...".Dump();
            var p = entry.Value as IDisposable;
            p?.Dispose();
            NonAwaitedTasks.Remove(entry.Key);
        }
    }
    while (NonAwaitedTasks.Count > 0);
}

// Define other methods and classes here

public void SetProcessorRunning()
{
    var p = new Processor();

    var task = Task.Run(() => p.DoWorkAsync().ConfigureAwait(false));
    NonAwaitedTasks.Add(task, p);
}

public interface IProcessor
{
    Task DoWorkAsync();
}

public static Dictionary<Task, IProcessor> NonAwaitedTasks = new Dictionary<Task, IProcessor>();

public class Processor : IProcessor, IDisposable
{
    bool isDisposed = false;
    public void Dispose()
    {
        this.isDisposed = true;
        "I have been disposed of".Dump();
    }

    public async Task DoWorkAsync()
    {
        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);

        if (this.isDisposed)
        {
            $"I have been disposed of (isDispose = {this.isDisposed}) but I've not finished work yet...".Dump();
        }

        await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
    }
}

输出:

任务已完成,所以将处置任务的处理器...

我已经被处理掉了

我已经被处理掉了 (isDispose = True) 但我还没有完成工作 还是……

【问题讨论】:

  • 旁注:用...Async 后缀命名可能是同步方法ExecuteAsync 非常混乱。请确保至少对于您在此处显示的公共示例(又名minimal reproducible example)不要这样做。
  • @AlexeiLevenkov 的观点也可能指向实际问题。 Module.ExecuteAsync() 实际上是异步方法吗?如果是这种情况,那么 Task.Run() 在 ExecuteAsync() 完成之前完成(假设您在 ExecuteAsync() 中有任何等待)。
  • 该任务在完成之前不会被标记为完成,因此您所描述的内容是不可能的。肯定有其他事情发生,例如,另一个工作具有相同的Type(因此是相同的模块),或async void,或即发即弃代码。
  • 斯蒂芬是正确的;我(错误地)想到了 Task 构造函数,你的 lambda 可能看起来像一个函数,但实际上是一个动作,所以忽略我上面的评论。我认为您需要向我们展示ExecuteAsync() 的签名。
  • 尝试摆脱所有Task.Runs,只使用您的方法返回的Tasks。大多数好的异步代码只包含很少的Task.Runs,尤其是调用已经返回Task的方法。

标签: c# async-await


【解决方案1】:

你的问题出在这一行:

var task = Task.Run(() => p.DoWorkAsync().ConfigureAwait(false));

将鼠标悬停在var 上,看看它是什么类型。

Task.Run 通过为Func&lt;Task&lt;Task&gt;&gt; 和朋友制定特殊的“任务展开”规则来理解async 委托。但它不会对Func&lt;ConfiguredTaskAwaitable&gt; 进行任何特殊的展开。

你可以这样想;使用上面的代码:

  1. p.DoWorkAsync() 返回 Task
  2. Task.ConfigureAwait(false) 返回 ConfiguredTaskAwaitable
  3. 因此要求Task.Run 运行此函数,该函数在线程池线程上创建ConfiguredTaskAwaitable
  4. 因此,Task.Run 的返回类型是 Task&lt;ConfiguredTaskAwaitable&gt; - 一旦创建 ConfiguredTaskAwaitable,任务就会完成。当它创建时 - 不是当它完成时。

在这种情况下,ConfigureAwait(false) 无论如何都不会做任何事情,因为没有要配置的 await。所以你可以删除它:

var task = Task.Run(() => p.DoWorkAsync());

另外,正如 Servy 所提到的,如果您不需要在线程池线程上运行DoWorkAsync,您也可以跳过Task.Run

var task = p.DoWorkAsync();

【讨论】:

  • Stephen Cleary 和 Servy 两种方法,如果我不需要在线程池线程上运行我的任务,我可以放弃 Task.Run。 Mine 是一个 Windows 服务,它实现了一个循环,该循环首先从数据库中获取 n 个工作项,然后以即发即弃的方式设置每个工作项处理(因为它们涉及 i/o,因此是异步的),然后执行另一个循环的迭代。在这种情况下,我认为异步即发即弃应该在线程便便线程上。你同意吗?
  • 我几乎从不推荐即发即弃。您确定要默默吞下异常吗?
  • 明白。在这种情况下,虽然以“即发即弃”的方式调用的异步方法是自包含的;它执行各种操作,处理异常并将状态记录到数据库,因此没有任何内容被默默吞下。没有什么要“报告”的——我们只想在任务上保留标签,以便服务在完成之前不能停止。因此我认为这应该是一个线程池线程...
  • “这样服务在完成之前不能停止”——这是我不推荐即发即弃的另一个原因。如果您保留Tasks,那么您可以在关机期间使用await。这对我来说似乎比“保留标签”更干净。换句话说,您已经有一个信号(Task)在后台任务完成时通知您;为什么要建造另一个?
  • 当然,没关系。 Task.Run 非常适合将所有内容卸载到线程池,并且根本不会阻塞调用代码。如果你不忽略来自Task.Run 的任务,那么它根本就不是一劳永逸。
猜你喜欢
  • 1970-01-01
  • 2016-05-23
  • 2022-01-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-01-04
  • 1970-01-01
相关资源
最近更新 更多