【问题标题】:How to re-enqueue a failed background async task如何重新排队失败的后台异步任务
【发布时间】:2020-04-10 07:26:11
【问题描述】:

您好,我正在尝试使用BackgroundService 创建一个在后台执行的通用任务队列服务。我避免使用Delegate 函数Func<T1,T2, OuT> 作为EnqueueTask(Task<ResponseHelper> newTask) 方法的输入,因为我想要一个通用的解决方案,所以我选择传递Task<ResponseHelper>。但是这个解决方案不允许我在ExecuteAsync(CancellationToken stoppingToken) 中重新排队失败的任务,因为该任务返回一个ResponseHelper 的实例,它不是失败任务的副本。请帮助我更正代码,我必须从 DequeueTaskAsync(CancellationToken cancellationToken) 函数返回出队任务,而不是 ResponseHelper 的实例。

public interface ITaskQueueHelper
{
    void EnqueueTask(Task<ResponseHelper> newTask);

    Task<ResponseHelper> DequeueTaskAsync(CancellationToken cancellationToken);
}

public class TaskQueueHelper : ITaskQueueHelper
{
    private readonly SemaphoreSlim signal;

    private readonly ConcurrentQueue<Task<ResponseHelper>> taskQueue;

    public TaskQueueHelper()
    {
        signal = new SemaphoreSlim(0);

        taskQueue = new ConcurrentQueue<Task<ResponseHelper>>();
    }

    public void EnqueueTask(Task<ResponseHelper> newTask)
    {
        if (newTask == null)
        {
            throw new ArgumentNullException(nameof(newTask));
        }

        taskQueue.Enqueue(newTask);

        signal.Release();
    }

    public async Task<ResponseHelper> DequeueTaskAsync(CancellationToken cancellationToken)
    {
        await signal.WaitAsync(cancellationToken);

        taskQueue.TryDequeue(out var currentTask);

        /*I need to return currentTask here, instead of an instance of ResponseHelper*/

        return await currentTask;
    }
}

public class TaskQueueService : BackgroundService
{
    private readonly ITaskQueueHelper taskQueue;

    private readonly ILogger<TaskQueueService> logger;

    public TaskQueueService(
        ITaskQueueHelper _taskQueue,
        ILogger<TaskQueueService> _logger)
    {
        logger = _logger;

        taskQueue = _taskQueue;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            ResponseHelper response = await taskQueue.DequeueTaskAsync(stoppingToken);

            try
            {
                if (!response.Status.Equals(ResultCode.Success))
                {
                    /*I need to re-enqueue a failed task here*/

                    //taskQueue.EnqueueTask(currentTask);
                }
            }
            catch (Exception e)
            {
                logger.LogError(e, $"Error occurred executing {nameof(TaskQueueService)}");
            }
        }
    }
}

【问题讨论】:

  • 只是提醒一下,如果你需要工作,我已经成功地使用了hangfire。
  • @sommmen 感谢您的提醒,如果我无法从其他软件工程师那里得到解决,我会考虑这一点。
  • 我们很难理解你在这里做什么。我的意思是,如果您想重新排队任务,则需要保留对它的引用,因此需要将其添加到您的响应中,或者您的调度服务应该保留对它的引用。没有别的办法
  • @sommmen 我正在尝试使用EnqueueTask(Task&lt;ResponseHelper&gt; newTask) 将任务添加到队列中,而不是通过Delegate 添加它们,任务返回ResponseHelper 类型的实例,但是当DequeueTaskAsync(CancellationToken cancellationToken)函数运行 return await currentTask; 它返回 ResponseHelper 的实例,而不是当前的出队任务。我希望这个函数返回 currentTask。这就是我需要解决的问题。
  • Because integerTask is a Task&lt;TResult&gt;, it contains a Result property of type TResult. In this case, TResult represents an integer type. When await is applied to integerTask, the await expression evaluates to the contents of the Result property of integerTask. The value is assigned to the ret variable. 来自 msdn。你看这不是这样的..

标签: c# asp.net-core asp.net-core-mvc asp.net-core-2.2


【解决方案1】:

对于重试,您可以这样做:


    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            ResponseHelper response = await taskQueue.DequeueTaskAsync(stoppingToken);

            try
            {
                if (!response.Status.Equals(ResultCode.Success))
                {
                    // Retry the task.
                    response = await taskQueue.DequeueTaskAsync(stoppingToken);

                }
            }
            catch (Exception e)
            {
                logger.LogError(e, $"Error occurred executing {nameof(TaskQueueService)}");
            }
        }
    }

【讨论】:

  • 这会重试一次任务,你可以做一个while循环来做几次
  • 还可以查看 polly github.com/App-vNext/Polly 以进行重试。它是一个受欢迎的库,也是 .net 基金会的一部分。
  • ITaskQueueHelper interface 中添加属性的想法听起来如何? public Task&lt;ResponseHelper&gt; ExecutingTask { set; get; } 并分配出队任务的副本。然后我可以做这样的事情taskQueue.EnqueueTask(taskQueue.ExecutingTask);。您对此有何看法?
  • @yaddly 我想这也可以,但也许更好地保护它,并选择将其放入派生类中,所以只有你的后台服务可以摆弄它。您不希望任何其他实施者摆弄我猜想的任务。然后,您可以使用您可能需要的一些函数来扩展派生/接口类,例如 IsActiveTaskRunning 等。
  • 这将遵循开放的扩展封闭的修改原则。
猜你喜欢
  • 1970-01-01
  • 2019-02-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-09-25
相关资源
最近更新 更多