一、概述
上一篇讲述的是CCR内真正执行任务的实体Dispatcher,主要是说明了常用的:构造函数、释放函数的使用和实现。这次从我开始讲述Dispatcher的唯一交互渠道:DispatcherQueue。
DispatcherQueue是整个CCR中,唯一能让用户与Dispatcher交互的地方,所有外界想与Dispatcher交互都只能直接或间接通过DispatcherQueu来做,可见DispatcherQueue在整个CCR中处于“咽喉”之地。
二、预览
DispatcherQueue类内,主要实现了5部分的功能:
1、构造实例对象;
public DispatcherQueue(string name);
public DispatcherQueue(string name, Dispatcher dispatcher);
public DispatcherQueue(string name, Dispatcher dispatcher,
TaskExecutionPolicy policy, double schedulingRate);
public DispatcherQueue(string name, Dispatcher dispatcher,
TaskExecutionPolicy policy, int maximumQueueDepth);
2、销毁相关资源;
protected virtual void Dispose(bool disposing);
3、任务进队、出队操作;
public virtual bool TryDequeue(out ITask task);
4、队列状态控制;
public virtual void Resume();
5、因果关系相关控制;
这5个功能中,常用的是前面4个,因此本基础篇就只讲这4个功能相关的函数的使用和实现原理,最后一个功能不常用,计划与Dispatcher内的因果关系部分放在后面作为高级篇细说。
三、构造函数
DispatcherQueue类内根据所使用的线程池的种类不同,而分为2类:
1、一类是使用CLR的线程池的构造函数:
/// Default constructor
/// 默认构造函数
/// </summary>
public DispatcherQueue()
: this("Unnamed queue using CLR Threadpool")
{
}
/// <summary>
/// Constructs an instance of the dispatcher port using the CLR thread pool for task execution
/// 构建一个不使用CCR的线程池,而是使用CLR线程池执行任务的实例
/// </summary>
/// <param name="name">
/// 名称
/// </param>
public DispatcherQueue(string name)
{
this._taskQueue = new Store<ITask>();
this._timescale = 1.0;
this._timerTable = new Dictionary<long, Timer>();
this._name = name;
}
2、一类是使用CCR线程池(也即操作系统线程池)的构造函数:
: this(name, dispatcher, TaskExecutionPolicy.Unconstrained, 0, 1.0)
{
}
/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">Friendly name</param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// 执行任务的Dispatcher实例
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// 任务调度策略
/// </param>
/// <param name="schedulingRate">
/// Average desired scheduling rate, in tasks per second.
/// 期望的任务平均调度速率(每秒执行几个任务)
/// Only valid when appropriate policy is specified
/// 仅当指定对应策略的时候才生效
/// </param>
public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, double schedulingRate)
: this(name, dispatcher, policy, 0, schedulingRate)
{
}
/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">
/// Friendly name
/// </param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// </param>
/// <param name="maximumQueueDepth">
/// Maximum number of pending tasks.
/// 最大待处理任务数
/// Only valid when appropriate policy is specified
/// 仅当指定对应策略的时候才生效
/// </param>
public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth)
: this(name, dispatcher, policy, maximumQueueDepth, 0.0)
{
}
/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">
/// Friendly name
/// </param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// </param>
/// <param name="maximumQueueDepth">
/// Maximum number of pending tasks.
/// Only used when appropriate policy is specified
/// </param>
/// <param name="schedulingRate">
/// Average desired scheduling rate, in tasks per second.
/// Only used when appropriate policy is specified
/// </param>
private DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy,
int maximumQueueDepth, double schedulingRate)
{
// 1.初始化 任务队列、时间刻度、定时器表
this._taskQueue = new Store<ITask>();
this._timescale = 1.0;
this._timerTable = new Dictionary<long, Timer>();
// 2.初始化 任务调度策略、
if (dispatcher == null)
{
throw new ArgumentNullException("dispatcher");
}
if (((policy == TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks) ||
(policy == TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution)) &&
(maximumQueueDepth <= 0))
{
throw new ArgumentOutOfRangeException("maximumQueueDepth");
}
if (((policy == TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks) ||
(policy == TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution)) &&
(schedulingRate <= 0.0))
{
throw new ArgumentOutOfRangeException("schedulingRate");
}
this._dispatcher = dispatcher;
this._name = name;
this._policy = policy;
this._maximumQueueDepth = maximumQueueDepth;
this._maximumSchedulingRate = schedulingRate;
// 3.把DispatcherQueue关联到指定的Dispatcher上
dispatcher.AddQueue(name, this);
// 4.判断是否需要开启CCR秒表
if (policy >= TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks)
{
this._watch = CcrStopwatch.StartNew();
}
}
四、资源释放函数
DispatcherQueue内含任务队列,因此也需要做资源的释放,而且改函数的调用应该在Diapatcher的Dispose函数调用之前,详细原因看代码:
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// Implementation of dispose
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
// 移除对应Dispatcher与当前DispatcherQueue的关联
if ((disposing &&
(this._dispatcher != null)) &&
!this._dispatcher.RemoveQueue(this._name))
{
// 释放任务队列内未处理的任务
int elementCount = 0;
lock (this._taskQueue)
{
elementCount = this._taskQueue.ElementCount;
this._taskQueue = null;
}
// 调整对应Dispatcher内的未处理任务数
this._dispatcher.AdjustPendingCount(-elementCount);
}
}
五、任务操作函数
用户除了可以通过Port向DispatcherQueue推入任务外,还可以自己生产ITask任务,然后把它推入DiapatcherQueue内调度执行。为此DispatcherQueue具备了进队、出队2个功能。而且前面第三篇所提到的任务调度策略,也是在进队这个函数内实现的。
六、运行状态控制函数
DispatcherQueue提供了挂起、恢复的操作,以调度线程池对任务的运行,不过要注意的是,挂起状态下,用户仍然可以向DispatcherQueue提交任务。
{
lock (this._taskQueue)
{
this._isSuspended = true;
}
}
/// <summary>
/// Resumes execution of tasks, including any tasks queued while in paused state
/// 恢复任务的执行,包含所有处于暂停状态的排队任务
/// </summary>
public virtual void Resume()
{
lock (this._taskQueue)
{
this._isSuspended = false;
}
this._dispatcher.Signal();
}
七、附录
1、本系列其他文章:
我的CCR之旅(4):倾听CCR的心跳声--Dispatcher(基础篇)