【问题标题】:Cancelling a task from a list of tasks从任务列表中取消任务
【发布时间】:2021-03-28 02:31:59
【问题描述】:

我需要轮询数据库并为每个表创建 parquet 文件。这些任务将并行运行,同时对要创建的线程数有一些可配置的限制。
添加线程并且任务开始运行后,我需要设置一个计时器,以便如果任何查询花费的时间超过指定时间,则可以取消任务并关闭该线程。如果集合中还有任务,则将调用下一个线程。
我正在使用CancellationTokenSource,但它没有按预期工作。

public async static Task CreateExtractionAndUpload(ConfigurationDetails config)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    cts.CancelAfter(10000);
    
    List<Task> getData = new List<Task>();
    foreach (var query in config.Root.Queries)
    {
        getData.Add(Task.Run(() => DoExtractionFromQueries(query, dbManager, cts.Token)));
    }

     await Task.WhenAll(getData);
}

private async static void DoExtractionFromQueries(ExtractionQueries query, DBManager dBManager, CancellationToken cancelToken)
{
    try
    {
        while (!cancelToken.IsCancellationRequested)
        {
            Thread.Sleep(20000);
            var dataTable = dBManager.GetDataTable(query.Query, System.Data.CommandType.Text);
            //ParaquetFileHandler.GenerateParquetFile(dataTable);
        }
    }
    catch (TimeoutException ex)
    {
        Logger.Error("Query taking longer than expected time!", ex);
    }
    catch (Exception ex)
    {
        Logger.Error("Exception in running query!", ex);
    }
}

我做错了什么以及如何纠正它?
如何限制线程数?
我可以限制Parallel.Foreach 中的线程,但我可以在超时后取消任务吗?

【问题讨论】:

  • 取消令牌仅在cancelToken.IsCancellationRequested 此处检查,因此Thread.Sleep(); var dataTable = dBManager.GetDataTable 将阻塞数秒,直到执行完这两行后才会再次检查令牌。
  • 代码看起来很奇怪。您将任务设置为 10 秒后取消,但您要做的第一件事是睡眠 20 秒。所以每个任务应该只调用一次数据库。取消是合作的,即GetDataTable 将始终运行到完成。
  • @Ben 是的,这就是正在发生的事情。我想退出那个线程。我怎样才能退出它。
  • @JonasH 所以在实际情况下,如果执行查询需要时间,我想取消线程。为了复制这个问题,我把这个睡眠认为它会出来。但它不起作用。我该怎么做才能解决这个问题。

标签: c# windows multithreading service task-parallel-library


【解决方案1】:

我正在使用 CancellationTokenSource,但它没有按预期工作。

.NET 中的取消是合作的。所以代码必须检查是否被取消。如果不检查,则不会取消。

在这种情况下,您可能需要修改 dBManager.GetDataTable 以使其能够感知取消:将 CancellationToken 参数添加到该方法,并将其传递给它调用的任何长时间运行的方法。

【讨论】:

    【解决方案2】:

    如果您无法使 GetDataTable 异步(并接受取消令牌),实现此目的的一种方法是配置您的数据库连接的超时,然后您可以:

     Parallel.ForEach(
        source: config.Root.Queries,
        parallelOptions: new ParallelOptions { MaxDegreeOfParallelism = NUMBER },
        body: (query) => DoExtractionFromQueries(query, dBManager)
    

    请注意,如果您采用推荐的方式使 GetDataTable 异步,那么您将从 并行 转到 并发。使用任务时,没有简单的方法可以设置“最大并发级别”。您需要进行一两次搜索才能找到 Task.WhenAll 的替代方法,它允许设置并发任务数量的限制。

    【讨论】:

    • 虽然我倾向于同意 Parallel.Foreach 而不是任务,但这个答案并没有解决问题的核心,即如果花费的时间超过某个超时,则中止。
    • @JonasH 你是 100% 正确的,但是 Stephen-Cleary 的回答没有解决“要创建的线程数的可配置限制”方面 :) 我已经重写了解决问题的答案超时并添加了有关并行性/并发性的注释。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多