【发布时间】:2011-08-24 15:09:35
【问题描述】:
我正在创建一个调度程序类——它本身就是一个长时间运行的任务,用户可以随时取消它。此任务将轮询数据库以查看是否有任何工作需要完成,并运行多达 X [5] # 个子任务。
据我所知 - 它运行良好,但我对代码有一些问题/疑虑。或多或少——因为我找不到另一个例子——我做得对吗?有什么我可以改进的吗?
- 我正在使用 ConcurrentDictionary 来跟踪正在运行的子任务。此字典存储正在处理的 RequestKey,以及该任务的 CancellationTokenSource。
问:这是最好的方法吗?在 StartDownloadProcess(这是子任务)中,我正在创建 CancellationTokenSource 并将其添加到字典中,然后启动任务。我添加了一个 Continuation 到它,然后在处理完成时从 Dictionary 中删除该项目,这样它就不会在 Cancel 方法中被调用。
在子任务中,我将取消令牌传递给实际执行工作的方法。然后,该进程将通过定期检查该令牌来检查它是否需要中止。这是正确的吗?
-
在 Cancel 方法中 - 我正在创建字典中键的副本,对其进行迭代并尝试从字典中访问和删除项目并发出取消请求。
李>
问:这是最好的方法吗?我是否需要等待以查看任务是否实际取消?可以吗?
问:我应该处置 CTS 吗?
- 我正在主要任务中执行 Thread.Sleep.. 好/坏?我应该改用 SpinWait 吗?是否有另一种方法/更好的方法可以让主轮询器进入睡眠状态并以特定的时间间隔重新运行?
注意:在 StartDownloadProcess 中,我使用 while(true) 循环直到任务完成,或者被取消以迭代直到 j > requestKey。在实际代码中不会有 while 循环。它只会启动新任务并运行实际的下载过程。
--
/// <summary>
/// Primary dispatcher token source
/// </summary>
CancellationTokenSource primaryTokenSource;
/// <summary>
/// A collection of Worker Tokens which can be used to cancel worker tasks and keep track of how many
/// there are.
/// </summary>
ConcurrentDictionary<int, CancellationTokenSource> workerTokens = new ConcurrentDictionary<int, CancellationTokenSource>();
/// <summary>
/// Runs this instance.
/// </summary>
public void Run() {
// Only one dispatcher can be running
if (IsRunning)
return;
// Create a new token source
primaryTokenSource = new CancellationTokenSource();
// Create the cancellation token to pass into the Task
CancellationToken token = primaryTokenSource.Token;
// Set flag on
IsRunning = true;
// Fire off the dispatcher
Task.Factory.StartNew(
() => {
// Loop forever
while (true) {
// If there are more than 5 threads running, don't add a new one
if (workerTokens.Count < 5) {
// Check to see if we've been cancelled
if (token.IsCancellationRequested)
return;
// Check to see if there are pending requests
int? requestKey = null;
// Query database (removed)
requestKey = new Random().Next(1550);
// If we got a request, start processing it
if (requestKey != null) {
// Check to see if we've been cancelled before running the child task
if (token.IsCancellationRequested)
return;
// Start the child downloader task
StartDownloadProcess(requestKey.Value);
}
} else {
// Do nothing, we've exceeded our max tasks
Console.WriteLine("MAX TASKS RUNNING, NOT STARTING NEW");
}
// Sleep for the alloted time
Thread.Sleep(Properties.Settings.Default.PollingInterval);
}
}, token)
// Turn running flag off
.ContinueWith((t) => IsRunning = false)
// Notify that we've finished
.ContinueWith(OnDispatcherStopped);
}
/// <summary>
/// Starts the download process.
/// </summary>
/// <param name="requestKey">The request key.</param>
private void StartDownloadProcess(int requestKey) {
CancellationTokenSource workerTokenSource = new CancellationTokenSource();
CancellationToken token = workerTokenSource.Token;
// Add the token source to the queue
workerTokens.GetOrAdd(requestKey, workerTokenSource);
// Start the child downloader task
Task.Factory.StartNew(
() => {
int j = 0;
while (true) {
if (token.IsCancellationRequested) {
Console.WriteLine("Sub-Task Cancelled {0}", requestKey);
return;
}
// Create a new downloader, pass it the RequestKey and token
//var downloader = new Downloader(requestKey, token);
//downloader.Run();
// Simulate work
Thread.Sleep(250);
Console.WriteLine("SUB-Task {0} is RUNNING! - #{1}", requestKey, j);
// Simulate - automatically end task when j > requestkey
if (j++ > requestKey) {
Console.WriteLine("SUB TASK {0} IS ENDING!", requestKey);
return;
}
}
},
token
).ContinueWith((t) => {
// If we ended naturally, the cancellationtoken will need to be removed from the dictionary
CancellationTokenSource source = null;
workerTokens.TryRemove(requestKey, out source);
});
}
/// <summary>
/// Cancels this instance.
/// </summary>
public void Cancel() {
// Cancel the primary task first so new new child tasks are created
if (primaryTokenSource != null)
primaryTokenSource.Cancel();
// Iterate over running cancellation sources and terminate them
foreach (var item in workerTokens.Keys.ToList()) {
CancellationTokenSource source = null;
if (workerTokens.TryRemove(item, out source)) {
source.Cancel();
}
}
}
此外,上面的示例中没有显示。在任务中也可以使用几个事件来引发...这些事件都如下所示:
public event EventHandler DispatcherStarted;
private void OnDispatcherStarted() {
EventHandler handler = DispatcherStarted;
if (handler != null)
Task.Factory.StartNew(() => handler(this, EventArgs.Empty), CancellationToken.None, TaskCreationOptions.None, taskScheduler).Wait();
}
在 Run() 方法中 - 在不同的点它会调用 OnDispatcher*();引发事件,以便调用者可以订阅并得到通知。事件创建的那些任务将在主线程上运行。
- 奖励问题:我正在考虑使调度程序通用并传入检查数据库的“轮询器”对象。如果成功,则创建一个子任务并传入它需要的参数。我遇到了一些问题,比如..如何传递数据,传递什么对象.. Interfaces/Classes/Func/Action 等。我怎样才能把它变成一个运行的通用调度程序/轮询器A 返回参数(我在想一个字典),然后创建一个使用这些参数并支持取消和事件通知的子任务 B?
【问题讨论】:
-
不,我当然没有! - 我什至不知道它的存在。谢谢!绝对更合适。
标签: c# winforms multithreading task-parallel-library task