【发布时间】:2019-04-09 18:31:10
【问题描述】:
[2018 年 4 月 18 日更新,使用 LinqPad 示例 - 见结尾]
我的应用程序收到一份工作列表:
var jobs = await myDB.GetWorkItems();
(注意:我们在任何地方都使用 .ConfigureAwait(false),我只是没有在这些伪代码 sn-ps 中显示它。)
对于每个作业,我们创建一个长时间运行的任务。但是,我们不想等待这个长时间运行的任务完成。
jobs.ForEach(job =>
{
var module = Factory.GetModule(job.Type);
var task = Task.Run(() => module.ExecuteAsync(job.Data));
this.NonAwaitedTasks.Add(task, module);
};
任务及其相关模块实例都被添加到 ConcurrentDictionary 中,这样它们就不会超出范围。
在其他地方,我有另一个偶尔调用的方法,其中包含以下内容:
foreach (var entry in this.NonAwaitedTasks.Where(e => e.Key.IsCompleted))
{
var module = entry.Value as IDisposable;
module?.Dispose();
this.NonAwaitedTasks.Remove(entry.Key);
}
(注意,NonAwaitedTasks 还使用 SemaphoreSlim 锁定...)
所以,这个想法是这个方法会找到所有已经完成的任务,然后处理它们的相关模块,并将它们从字典中删除。
但是....
在 Visual Studio 2017 中进行调试时,我从数据库中提取了一个作业,并且在我花时间在已实例化的单个模块中进行调试时,在该模块上调用了 Dispose。查看Callstack,我可以看到在上面的方法中已经调用了Dispose,那是因为任务有IsCompleted == true。但显然无法完成,因为我还在调试中。
- .IsCompleted 属性是否是要检查的错误属性?
- 这只是在 Visual Studio 中调试的产物吗?
- 我是不是走错了路?
其他信息
在下面的 cmets 中,我被要求提供一些关于流程的额外信息,因为我所描述的似乎不可能(事实上,我希望它不可能)。下面是我的代码的精简版本(我已经删除了对取消标记和防御性编码的检查,但没有任何影响流程)。
应用程序入口点
这是一个 Windows 服务。在 OnStart() 中是以下行:
this.RunApplicationTask =
Task.Run(() => myApp.DoWorkAsync().ConfigureAwait(false), myService.CancelSource.Token);
“RunApplicationTask”只是一个属性,用于在服务的生命周期内将执行任务保持在范围内。
DoWorkAsync()
public async Task DoWorkAsync()
{
do
{
await this.ExecuteSingleIterationAsync().ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
}
while (myApp.ServiceCancellationToken.IsCancellationRequested == false);
await Task.WhenAll(this.NonAwaitedTasks.Keys).ConfigureAwait(false);
await this.ClearCompletedTasksAsync().ConfigureAwait(false);
this.WorkItemsTaskCompletionSource.SetResult(true);
return;
}
所以当我在调试时,这是在迭代 DO-LOOP,它没有到达 Task.WhenAll(....)。
还要注意,在调用 Cancellation 请求并且所有任务都完成后,我会调用 ClearCompletedTasksAsync()。稍后会详细介绍....
ExecuteSingleIterationAsync
private async Task ExecuteSingleIterationAsync()
{
var getJobsResponse = await DB.GetJobsAsync().ConfigureAwait(false);
await this.ProcessWorkLoadAsync(getJobsResponse.Jobs).ConfigureAwait(false);
await this.ClearCompletedTasksAsync().ConfigureAwait(false);
}
ProcessWorkLoadAsync
private async Task ProcessWorkLoadAsync(IList<Job> jobs)
{
if (jobs.NoItems())
{
return ;
}
jobs.ForEach(job =>
{
// The processor instance is disposed of when removed from the NonAwaitedTasks collection.
IJobProcessor processor = ProcessorFactory.GetProcessor(workItem, myApp.ServiceCancellationToken);
try
{
var task = Task.Run(() => processor.ExecuteAsync(job).ConfigureAwait(false), myApp.ServiceCancellationToken);
this.NonAwaitedTasks.Add(task, processor);
}
catch (Exception e)
{
...
}
});
return;
}
每个处理器都实现以下接口方法: Task ExecuteAsync(Job job);
当我在 ExecuteAsync 中时,.Dispose() 在我正在使用的处理器实例上被调用。
ProcessorFactory.GetProcessor()
public static IJobProcessor GetProcessor(Job job, CancellationToken token)
{
.....
switch (someParamCalculatedAbove)
{
case X:
{
return new XProcessor(...);
}
case Y:
{
return new YProcessor(...);
}
default:
{
return null;
}
}
}
所以我们在这里得到了一个新的实例。
ClearCompletedTasksAsync()
private async Task ClearCompletedTasksAsync()
{
await myStatic.NonAwaitedTasksPadlock.WaitAsync().ConfigureAwait(false);
try
{
foreach (var taskEntry in this.NonAwaitedTasks.Where(entry => entry.Key.IsCompleted).ToArray())
{
var processorInstance = taskEntry.Value as IDisposable;
processorInstance?.Dispose();
this.NonAwaitedTasks.Remove(taskEntry.Key);
}
}
finally
{
myStatic.NonAwaitedTasksPadlock.Release();
}
}
这被称为 Do-Loop 的每次迭代。其目的是确保非等待任务列表保持较小。
就是这样...... Dispose 似乎只在调试时被调用。
LinqPad 示例
async Task Main()
{
SetProcessorRunning();
await Task.Delay(TimeSpan.FromSeconds(1)).ConfigureAwait(false);
do
{
foreach (var entry in NonAwaitedTasks.Where(e => e.Key.IsCompleted).ToArray())
{
"Task is completed, so will dispose of the Task's processor...".Dump();
var p = entry.Value as IDisposable;
p?.Dispose();
NonAwaitedTasks.Remove(entry.Key);
}
}
while (NonAwaitedTasks.Count > 0);
}
// Define other methods and classes here
public void SetProcessorRunning()
{
var p = new Processor();
var task = Task.Run(() => p.DoWorkAsync().ConfigureAwait(false));
NonAwaitedTasks.Add(task, p);
}
public interface IProcessor
{
Task DoWorkAsync();
}
public static Dictionary<Task, IProcessor> NonAwaitedTasks = new Dictionary<Task, IProcessor>();
public class Processor : IProcessor, IDisposable
{
bool isDisposed = false;
public void Dispose()
{
this.isDisposed = true;
"I have been disposed of".Dump();
}
public async Task DoWorkAsync()
{
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
if (this.isDisposed)
{
$"I have been disposed of (isDispose = {this.isDisposed}) but I've not finished work yet...".Dump();
}
await Task.Delay(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
}
}
输出:
任务已完成,所以将处置任务的处理器...
我已经被处理掉了
我已经被处理掉了 (isDispose = True) 但我还没有完成工作 还是……
【问题讨论】:
-
旁注:用
...Async后缀命名可能是同步方法ExecuteAsync非常混乱。请确保至少对于您在此处显示的公共示例(又名minimal reproducible example)不要这样做。 -
@AlexeiLevenkov 的观点也可能指向实际问题。 Module.ExecuteAsync() 实际上是异步方法吗?如果是这种情况,那么 Task.Run() 将在 ExecuteAsync() 完成之前完成(假设您在 ExecuteAsync() 中有任何等待)。
-
该任务在完成之前不会被标记为完成,因此您所描述的内容是不可能的。肯定有其他事情发生,例如,另一个工作具有相同的
Type(因此是相同的模块),或async void,或即发即弃代码。 -
斯蒂芬是正确的;我(错误地)想到了
Task构造函数,你的 lambda 可能看起来像一个函数,但实际上是一个动作,所以忽略我上面的评论。我认为您需要向我们展示ExecuteAsync()的签名。 -
尝试摆脱所有
Task.Runs,只使用您的方法返回的Tasks。大多数好的异步代码只包含很少的Task.Runs,尤其是调用已经返回Task的方法。
标签: c# async-await