【问题标题】:Properly creating a Task to poll and dispatch child tasks with cancellation for each正确创建一个任务来轮询和分派子任务,并为每个任务取消
【发布时间】:2011-08-24 15:09:35
【问题描述】:

我正在创建一个调度程序类——它本身就是一个长时间运行的任务,用户可以随时取消它。此任务将轮询数据库以查看是否有任何工作需要完成,并运行多达 X [5] # 个子任务。

据我所知 - 它运行良好,但我对代码有一些问题/疑虑。或多或少——因为我找不到另一个例子——我做得对吗?有什么我可以改进的吗?

  1. 我正在使用 ConcurrentDictionary 来跟踪正在运行的子任务。此字典存储正在处理的 RequestKey,以及该任务的 CancellationTokenSource。

问:这是最好的方法吗?在 StartDownloadProcess(这是子任务)中,我正在创建 CancellationTokenSource 并将其添加到字典中,然后启动任务。我添加了一个 Continuation 到它,然后在处理完成时从 Dictionary 中删除该项目,这样它就不会在 Cancel 方法中被调用。

  1. 在子任务中,我将取消令牌传递给实际执行工作的方法。然后,该进程将通过定期检查该令牌来检查它是否需要中止。这是正确的吗?

  2. 在 Cancel 方法中 - 我正在创建字典中键的副本,对其进行迭代并尝试从字典中访问和删除项目并发出取消请求。

    李>

问:这是最好的方法吗?我是否需要等待以查看任务是否实际取消?可以吗?
问:我应该处置 CTS 吗?

  1. 我正在主要任务中执行 Thread.Sleep.. 好/坏?我应该改用 SpinWait 吗?是否有另一种方法/更好的方法可以让主轮询器进入睡眠状态并以特定的时间间隔重新运行?

注意:在 StartDownloadProcess 中,我使用 while(true) 循环直到任务完成,或者被取消以迭代直到 j > requestKey。在实际代码中不会有 while 循环。它只会启动新任务并运行实际的下载过程。

--

/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();

/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
  //  Only one dispatcher can be running
  if (IsRunning)
    return;

  //  Create a new token source
  primaryTokenSource = new CancellationTokenSource();
  //  Create the cancellation token to pass into the Task
  CancellationToken token = primaryTokenSource.Token;

  //  Set flag on
  IsRunning = true;

  //  Fire off the dispatcher
  Task.Factory.StartNew(
    () => {
      //  Loop forever
      while (true) {
        //  If there are more than 5 threads running, don't add a new one
        if (workerTokens.Count < 5) {
          //  Check to see if we've been cancelled
          if (token.IsCancellationRequested)
            return;

          //  Check to see if there are pending requests
          int? requestKey = null;

          //  Query database (removed)
          requestKey = new Random().Next(1550);

          //  If we got a request, start processing it
          if (requestKey != null) {
            //  Check to see if we've been cancelled before running the child task
            if (token.IsCancellationRequested)
              return;

            //  Start the child downloader task
            StartDownloadProcess(requestKey.Value);
          }
        } else {
          //  Do nothing, we've exceeded our max tasks
          Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
        }

        //  Sleep for the alloted time
        Thread.Sleep(Properties.Settings.Default.PollingInterval);
    }
  }, token)
  //  Turn running flag off
  .ContinueWith((t) => IsRunning = false)
  //  Notify that we've finished
  .ContinueWith(OnDispatcherStopped);
}

/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
  CancellationTokenSource workerTokenSource = new CancellationTokenSource();
  CancellationToken token = workerTokenSource.Token;

  //  Add the token source to the queue
  workerTokens.GetOrAdd(requestKey, workerTokenSource);

  //  Start the child downloader task
  Task.Factory.StartNew(
    () => {
      int j = 0;
      while (true) {
        if (token.IsCancellationRequested) {
          Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
          return;
        }

        //  Create a new downloader, pass it the RequestKey and token
        //var downloader = new Downloader(requestKey, token);
        //downloader.Run();

        //  Simulate work
        Thread.Sleep(250);
        Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);

        //  Simulate - automatically end task when j > requestkey
        if (j++ > requestKey) {
          Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
          return;
        }
      }
    },
    token
  ).ContinueWith((t) => {
    //  If we ended naturally, the cancellationtoken will need to be removed from the dictionary
    CancellationTokenSource source = null;
    workerTokens.TryRemove(requestKey, out source);
  });
}

/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
  //  Cancel the primary task first so new new child tasks are created
  if (primaryTokenSource != null)
    primaryTokenSource.Cancel();

  //  Iterate over running cancellation sources and terminate them
  foreach (var item in workerTokens.Keys.ToList()) {
    CancellationTokenSource source = null;
    if (workerTokens.TryRemove(item, out source)) {
      source.Cancel();
    }
  }
}

此外,上面的示例中没有显示。在任务中也可以使用几个事件来引发...这些事件都如下所示:

public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
  EventHandler handler = DispatcherStarted;
  if (handler != null) 
    Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();      
}

在 Run() 方法中 - 在不同的点它会调用 OnDispatcher*();引发事件,以便调用者可以订阅并得到通知。事件创建的那些任务将在主线程上运行。

  • 奖励问题:我正在考虑使调度程序通用并传入检查数据库的“轮询器”对象。如果成功,则创建一个子任务并传入它需要的参数。我遇到了一些问题,比如..如何传递数据,传递什么对象.. Interfaces/Classes/Func/Action 等。我怎样才能把它变成一个运行的通用调度程序/轮询器A 返回参数(我在想一个字典),然后创建一个使用这些参数并支持取消和事件通知的子任务 B?

【问题讨论】:

标签: c# winforms multithreading task-parallel-library task


【解决方案1】:

我赶紧看了一下扔代码,cmet很少:

  • IsRunning 标志的使用不是线程安全的,多个线程可以将其读取为 false,然后将其同时设置为 true,您将拥有多个调度程序线程!,以避免您必须使用 Interlocked.CompareExchange 来设置它,您还需要将其标记为 voaltile。
  • 我建议不要使用 Sleep,SpinWait 在这里也无济于事,您可以使用 Timer 对象来汇集数据库,并将请求添加到调度程序 clas 从中使用请求的 BlockingCollection。
  • 即使父任务被取消,子任务延续也将始终执行,您可以通过传递此 TaskContinuationOptions.NotOnCanceled 来避免这种情况

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多