【问题标题】:Pausable & resumable async task queue可暂停和可恢复的异步任务队列
【发布时间】:2023-08-06 05:38:01
【问题描述】:

我已经根据我在此处找到的内容实现了一个工作队列 > Task queue for wp8? ...但在实现附加功能时遇到了麻烦。

我取出了 Func<Task> 并用 ICommands 替换它们(持有它们自己的 CancellationTokens),并打算添加 Pause()Resume()Save()Restore() 方法。就是这样OnFormClose() 我可以暂停队列处理并提示用户决定他是要“等待队列完成”(即恢复)还是“立即退出”(即保存并退出)。

public class WqController
{
    private readonly Queue<ICommand> _queue = new Queue<ICommand>();
    private Task _queueProcessor;
    private ICommand _curCommand;

    public void Enqueue(ICommand command)
    {
        _queue.Enqueue(command);

        if (_queueProcessor == null) _queueProcessor = ProcessQueue();
    }

    private async Task ProcessQueue()
    {
        try
        {
            while (_queue.Count != 0)
            {
                _curCommand = _queue.Peek();

                try
                {
                    await Task.Run(() => _curCommand.Execute());
                }
                catch (OperationCanceledException)
                {
                    Console.WriteLine("QUEUE PAUSED");
                    return;
                }
                catch (Exception)
                {
                    Console.WriteLine("FAILED TO EXECUTE COMMAND");
                }
                _queue.Dequeue();
            }
        }
        finally
        {
            _queueProcessor = null;
            _curCommand = null;
        }
    }

    public async Task Cancel()
    {
        _curCommand.Cts.Cancel(true);
        await _queueProcessor;
    }

    public void Resume()
    {
        _queueProcessor = ProcessQueue();
    }
}

Save()Restore() 工作正常,所以我没有将它们包括在这里。 Cancel() 间歇性/不可靠地工作,Restore() 似乎根本不起作用(令我困惑,因为我基本上尝试与 Enqueue() 方法中的工作相同的重启)。

【问题讨论】:

  • 很难看出这个类如何支持任何不变量。例如,如果Status != WqStatus.Paused 然后Resume 只是默默地失败。添加断言确保仅在状态为Paused 时调用Resume。此外,您假设_queueProcessor == null。也为此添加一个断言。以此类推。
  • 我不认为 Status 或 Count 与我看到的问题有关。恢复代码肯定正在运行。想我会在这方面调整代码。
  • 当我在Resume() 上休息时,_queueProcessor 的状态显示为“WaitingForActivation”,是否有帮助?
  • 在我看来,我们缺少确定发生了什么的代码。例如,您谈论取消,但在哪里真正检查代码中的CancellationToken?我没看到那部分。
  • @YuvalItzchakov:我正在执行_curCommand.Cts.Token.ThrowIfCancellationRequested(),在通过此队列的命令调用的任何代码中的任何长时间运行的任务之前。您的评论刚刚帮助我解释了Cancel() 的间歇性。如果在最后一次取消检查通过后调用取消,则不会引发异常,并且会加载下一个命令(以及新的取消令牌)。因此,我在_queue.Dequeue 之后立即添加了if (_curCommand.Cts.Token.IsCancellationRequested) return;

标签: c# .net async-await task-queue cancellation


【解决方案1】:

我得到了这个工作,并认为我应该在这里概述我的解决方案。

事实证明,我对取消令牌的使用有点随意,这导致此类无法按预期运行。例如,以下问题是相关的:

  1. 如果在命令中通过最后一次取消检查后调用 Cancel,则将加载一个新命令(连同它自己的新取消令牌),因此取消调用将丢失/忽略。在_queue.Dequeue(); 之后,if (_curCommand.Cts.Token.IsCancellationRequested) return; 解决了这个问题。

  2. 调用取消后,如果稍后要恢复该命令,则需要一个新的取消令牌(否则取消 = true 的现有取消令牌仍将处于活动状态)。 _curCommand.InvalidateCancellationToken(); 行通过将令牌设置为 null 来实现这一点,然后我的命令在下次调用时刷新令牌。

我使用的完整代码:

public class WqController
{
    private readonly Queue<ICommand> _queue = new Queue<ICommand>();
    private Task _queueProcessor;
    private ICommand _curCommand;

    public void Enqueue(ICommand command)
    {
        _queue.Enqueue(command);

        if (_queueProcessor == null) _queueProcessor = ProcessQueue();
    }

    private async Task ProcessQueue()
    {
        try
        {
            while (_queue.Count != 0)
            {
                _curCommand = _queue.Peek();

                try
                {
                    await Task.Run(() => _curCommand.Execute());
                }
                catch (OperationCanceledException)
                {
                    _curCommand.InvalidateCancellationToken();
                    Console.WriteLine("QUEUE PAUSED");
                    return;
                }
                catch (Exception)
                {
                    Console.WriteLine("FAILED TO EXECUTE COMMAND");
                }
                _queue.Dequeue();
                if (_curCommand.Cts.Token.IsCancellationRequested) return;
            }
        }
        finally
        {
            _queueProcessor = null;
            _curCommand = null;
        }
    }

    public async Task Cancel()
    {
        _curCommand.Cts.Cancel(true);
        await _queueProcessor;
    }

    public void Resume()
    {
        _queueProcessor = ProcessQueue();
    }
}

这一切现在似乎运行得非常顺利,并且是对我之前使用的后台工作队列实现的重大改进。

【讨论】: