一、概述

    上一篇讲述的是CCR内真正执行任务的实体Dispatcher,主要是说明了常用的:构造函数、释放函数的使用和实现。这次从我开始讲述Dispatcher的唯一交互渠道:DispatcherQueue。
    DispatcherQueue是整个CCR中,唯一能让用户与Dispatcher交互的地方,所有外界想与Dispatcher交互都只能直接或间接通过DispatcherQueu来做,可见DispatcherQueue在整
个CCR中处于“咽喉”之地。

二、预览
    DispatcherQueue类内,主要实现了5部分的功能:
1、构造实例对象;

public DispatcherQueue();
public DispatcherQueue(string name);
public DispatcherQueue(string name, Dispatcher dispatcher);
public DispatcherQueue(string name, Dispatcher dispatcher, 
            TaskExecutionPolicy policy, 
double schedulingRate);
public DispatcherQueue(string name, Dispatcher dispatcher, 
            TaskExecutionPolicy policy, 
int maximumQueueDepth);

2、销毁相关资源;

public void Dispose();
protected virtual void Dispose(bool disposing);

 

3、任务进队、出队操作;

public virtual bool Enqueue(ITask task);
public virtual bool TryDequeue(out ITask task);

4、队列状态控制;

public virtual void Suspend();
public virtual void Resume();

5、因果关系相关控制;

public virtual Timer EnqueueTimer(TimeSpan timeSpan, Port<DateTime> timerPort);


    这5个功能中,常用的是前面4个,因此本基础篇就只讲这4个功能相关的函数的使用和实现原理,最后一个功能不常用,计划与Dispatcher内的因果关系部分放在后面作为高级篇细说。

三、构造函数
    DispatcherQueue类内根据所使用的线程池的种类不同,而分为2类:
1、一类是使用CLR的线程池的构造函数:

/// <summary>
/// Default constructor
/// 默认构造函数
/// </summary>
public DispatcherQueue()
    : 
this("Unnamed queue using CLR Threadpool")
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the CLR thread pool for task execution
/// 构建一个不使用CCR的线程池,而是使用CLR线程池执行任务的实例
/// </summary>
/// <param name="name">
/// 名称
/// </param>
public DispatcherQueue(string name)
{
    
this._taskQueue = new Store<ITask>();
    
this._timescale = 1.0;
    
this._timerTable = new Dictionary<long, Timer>();
    
this._name = name;
}

2、一类是使用CCR线程池(也即操作系统线程池)的构造函数:

 name, Dispatcher dispatcher)
    : this(name, dispatcher, TaskExecutionPolicy.Unconstrained, 01.0)
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">Friendly name</param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// 执行任务的Dispatcher实例
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// 任务调度策略
/// </param>
/// <param name="schedulingRate">
/// Average desired scheduling rate, in tasks per second. 
/// 期望的任务平均调度速率(每秒执行几个任务)
/// Only valid when appropriate policy is specified
/// 仅当指定对应策略的时候才生效
/// </param>
public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, double schedulingRate)
    : 
this(name, dispatcher, policy, 0, schedulingRate)
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">
/// Friendly name
/// </param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// </param>
/// <param name="maximumQueueDepth">
/// Maximum number of pending tasks. 
/// 最大待处理任务数
/// Only valid when appropriate policy is specified
/// 仅当指定对应策略的时候才生效
/// </param>
public DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy, int maximumQueueDepth)
    : 
this(name, dispatcher, policy, maximumQueueDepth, 0.0)
{
}

/// <summary>
/// Constructs an instance of the dispatcher port using the specified CCR dispatcher
/// </summary>
/// <param name="name">
/// Friendly name
/// </param>
/// <param name="dispatcher">
/// Dispatcher instance for executing tasks
/// </param>
/// <param name="policy">
/// Task scheduling policy
/// </param>
/// <param name="maximumQueueDepth">
/// Maximum number of pending tasks. 
/// Only used when appropriate policy is specified
/// </param>
/// <param name="schedulingRate">
/// Average desired scheduling rate, in tasks per second. 
/// Only used when appropriate policy is specified
/// </param>
private DispatcherQueue(string name, Dispatcher dispatcher, TaskExecutionPolicy policy,
                        
int maximumQueueDepth, double schedulingRate)
{
    
// 1.初始化 任务队列、时间刻度、定时器表
    this._taskQueue = new Store<ITask>();
    
this._timescale = 1.0;
    
this._timerTable = new Dictionary<long, Timer>();
    
// 2.初始化 任务调度策略、
    if (dispatcher == null)
    {
        
throw new ArgumentNullException("dispatcher");
    }
    
if (((policy == TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks) ||
        (policy 
== TaskExecutionPolicy.ConstrainQueueDepthThrottleExecution)) &&
        (maximumQueueDepth 
<= 0))
    {
        
throw new ArgumentOutOfRangeException("maximumQueueDepth");
    }
    
if (((policy == TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks) ||
        (policy 
== TaskExecutionPolicy.ConstrainSchedulingRateThrottleExecution)) &&
        (schedulingRate 
<= 0.0))
    {
        
throw new ArgumentOutOfRangeException("schedulingRate");
    }
    
this._dispatcher = dispatcher;
    
this._name = name;
    
this._policy = policy;
    
this._maximumQueueDepth = maximumQueueDepth;
    
this._maximumSchedulingRate = schedulingRate;
    
// 3.把DispatcherQueue关联到指定的Dispatcher上
    dispatcher.AddQueue(name, this);
    
// 4.判断是否需要开启CCR秒表
    if (policy >= TaskExecutionPolicy.ConstrainSchedulingRateDiscardTasks)
    {
        
this._watch = CcrStopwatch.StartNew();
    }
}

四、资源释放函数
    DispatcherQueue内含任务队列,因此也需要做资源的释放,而且改函数的调用应该在Diapatcher的Dispose函数调用之前,详细原因看代码:

 Dispose()
{
    this.Dispose(true);
    GC.SuppressFinalize(
this);
}

/// <summary>
/// Implementation of dispose
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
    
// 移除对应Dispatcher与当前DispatcherQueue的关联
    if ((disposing &&
        (
this._dispatcher != null)) &&
        
!this._dispatcher.RemoveQueue(this._name))
    {
        
// 释放任务队列内未处理的任务
        int elementCount = 0;
        
lock (this._taskQueue)
        {
            elementCount 
= this._taskQueue.ElementCount;
            
this._taskQueue = null;
        }
        
// 调整对应Dispatcher内的未处理任务数
        this._dispatcher.AdjustPendingCount(-elementCount);
    }
}

五、任务操作函数
    用户除了可以通过Port向DispatcherQueue推入任务外,还可以自己生产ITask任务,然后把它推入DiapatcherQueue内调度执行。为此DispatcherQueue具备了进队、出队2个功
能。而且前面第三篇所提到的任务调度策略,也是在进队这个函数内实现的。

}

六、运行状态控制函数
    DispatcherQueue提供了挂起、恢复的操作,以调度线程池对任务的运行,不过要注意的是,挂起状态下,用户仍然可以向DispatcherQueue提交任务。

 Suspend()
{
    lock (this._taskQueue)
    {
        
this._isSuspended = true;
    }
}

/// <summary>
/// Resumes execution of tasks, including any tasks queued while in paused state
/// 恢复任务的执行,包含所有处于暂停状态的排队任务
/// </summary>
public virtual void Resume()
{
    
lock (this._taskQueue)
    {
        
this._isSuspended = false;
    }
    
this._dispatcher.Signal();
}

七、附录
1、本系列其他文章:
我的CCR之旅(4):倾听CCR的心跳声--Dispatcher(基础篇)

相关文章:

  • 2022-01-21
  • 2021-12-31
  • 2022-03-02
  • 2021-11-06
  • 2022-01-07
  • 2021-10-13
  • 2021-06-23
  • 2021-08-09
猜你喜欢
  • 2021-12-06
  • 2021-08-23
  • 2022-12-23
  • 2021-10-02
  • 2022-12-23
  • 2021-12-18
相关资源
相似解决方案