【问题标题】:C# Queuing async Task using BlockingCollection and process queue only after value returned for previous getter task in queueC# 使用 BlockingCollection 排队异步任务并仅在为队列中的前一个 getter 任务返回值后处理队列
【发布时间】:2020-04-09 05:55:49
【问题描述】:

最近,我需要对异步任务进行排队,并且在此链接中向我介绍了 BlockingCollection Queuing asynchronous task in C# 它有效,我的要求略有变化,需要您的指导。我正在使用 @Stephen Cleary 回答中的 BlockingCollection

这是来自该链接的 BlockingCollection

public sealed class ExecutionQueue
{
  //private readonly BlockingCollection<Func<Task>> _queue = new BlockingCollection<Func<Task>>();//commented this
  private readonly BlockingCollection<Task> _queue = new BlockingCollection<Task>();

  public ExecutionQueue() => Complete = Task.Run(() => ProcessQueueAsync());

  public Task Completion { get; }

  public void Complete() => _queue.CompleteAdding();

  private async Task ProcessQueueAsync()
  {
    foreach (var value in _queue.GetConsumingEnumerable())
      await value();
  }
}

//public Task Run(Func<Task> lambda)
public Task Run(<Task> lambda)
{
  var tcs = new TaskCompletionSource<object>();
  _queue.Add(lamda);
  return tcs.Task;
}
  1. 我需要对常规 void 方法中的某些数据库任务进行排队。我可能无法更改此方法的签名。我该怎么做?
 public static ExecutionQueue taskQueue = new ExecutionQueue();

 private void SaveValesToDB(...)
 {
    var item = GetID(...);
    ...
    taskQueue.Run(Task.Run(() =>
    {
       DBInstance.DBSaveValue1(...); // is it correct to wrap with Task.Run and add to queue? it should be queued and run asynchronously
     });
    ...
 }
  1. 我们打开和关闭从数据库中保存和检索数据。因此,当我们排队一个返回类似 getter 的 DB 调用时,我们希望确保在收到返回值之前我们不会处理其他排队的项目。
private void SaveValesToDB(...)
{
 ...
 taskQueue.Run(Task.Run(() =>
 {
    DBInstance.DBSaveValue1(...); // is this correct? it should be queued and run asynchronously
  });
 ...
 taskQueue.Run(Task.Run(() =>
 {
    var result1 = DBInstance.DBGetValue2(...); // should be queued and run asynchronously; 
    LogData(result1);// not a DB call but believe it should be wrapped in here for the result1, correct?
 });

 /*so in above Task.Run,  i want to ensure that until i receive result1 
 i don't process other items in the queue even 
 if they are added. how can i do that ? 
 The main thread should continue. */
 ...
 var result 2 = DBInstance.DBGetValue3(...); // should be queued and run asynchronously

 UpdateAdvancedLod(result1 +" "+result2);// here, should i block main thread until i get result1 ?
}

  1. 如何处理错误?

请指导我。

已编辑:

if using Func<Task> in public Task Run(Func<Task> lambda) then is the below correct?

taskQueue.Run(async () =>
                {
                    await Task.Run(() =>
                    {
                        DBInstance.DBSaveValue1(...);//is this correct
                    });
                }
                );

【问题讨论】:

  • 为什么将Run方法的签名从public Task Run(Func&lt;Task&gt; lambda)改为public Task Run(Task task)?此更改完全破坏了 ExecutionQueue 类的预期功能。这个类的重点是安排任务在正确的时刻运行。当你给它一个已经创建的任务时,这个任务已经在运行,ExecutionQueue 类就失去了它存在的理由。
  • @TheodorZoulias:好的,我明白你的意思了。在我的用例中,如果不想更改 SaveValesToDB() 的方法签名,我该如何将它们添加到队列中?

标签: c# asynchronous async-await queue blockingcollection


【解决方案1】:

您可以将此方法添加到 Stephen Cleary 的 ExecutionQueue 类中:

public Task Run(Action action)
{
    return Run(() => Task.Run(action));
}

这是现有public Task Run(Func&lt;Task&gt; lambda) 方法的重载。这将提供的action 的执行委托给ThreadPool 线程。

使用示例:

var id = GetID();
var task = taskQueue.Run(() => DBInstance.DBSaveValue1(id));
await task; // Optional

更新:要将错误通知传播到主线程,您可以使用 Error 事件增强 ExecutionQueue 类,该事件将在捕获的上下文中调用(在实例已创建)。

private readonly SynchronizationContext _capturedContext;

public event EventHandler<Exception> Error;

public ExecutionQueue() // Constructor
{
    _capturedContext = SynchronizationContext.Current ?? new SynchronizationContext();
    Completion = Task.Run(() => ProcessQueueAsync());
}

private void OnError(Exception ex)
{
    var handler = Error; if (handler == null) return;
    _capturedContext.Post(_ => handler.Invoke(this, ex), null);
}

应该从catch (Exception ex) 块内部调用OnError。这将适用于 Windows 窗体应用程序和 WPF 应用程序,因为它们的 UI 线程配备了 SynchronizationContext。它不适用于控制台应用程序,因为那里没有 SynchronizationContextError 事件 将在随机的ThreadPool 线程中引发)。

【讨论】:

  • 谢谢。我的 Q2 怎么样,如果我遇到 DB Getter 方法,我需要等待它返回结果,然后才处理队列中的其他项目?是否可以?我仍然应该能够将项目添加到队列中,这样我的主线程就不会被阻塞。
  • @jamilia ExecutionQueue 按顺序处理队列。当一个任务完成后,它会立即运行下一个任务。防止下一个运行的唯一方法是延迟前一个的完成。所以你只需要在同一个Func&lt;Task&gt;Action中包含你想要发生的所有事情,然后将它传递给ExecutionQueue.Run
  • 只有当一个任务完成后才会运行下一个任务是真的吗?我们正在异步运行它们,对吗?那么,一旦队列中的第一个任务开始处理,队列中的下一个任务就会开始处理,而不需要等待前一个任务完成,对吗?
  • @jamilia 没有。看看ProcessQueueAsync。里面有一个awaitforeach (var value in _queue.GetConsumingEnumerable()) await value();。创建每个排队的任务,然后等待,然后循环继续。
  • 哦,好的。是的,将它们排队的整个想法是按顺序运行它们但异步运行。我很困惑。所以,我们在单独的消费者线程中运行,它将运行队列中的所有任务,使其异步。