| namespace Sopaco.Lib.ServiceModel.AsyncServiceDispatcher { public class AsyncServiceDispatcher : IAsyncServiceDispatcher { #region Fields private static readonly string DEFAULT_THREADHOSTIDENTITY = "Sopaco.Lib.ServiceModel.AsyncServiceDispatcher.DefaultThreadHostIdentity"; private IHostThreadTable _table; #endregion
#region Constructors & Initializer /// <summary> /// 为DI提供,不要添加代码 /// </summary> protected AsyncServiceDispatcher() { } /// <summary> /// 如果使用此构造方法,则初始化一个默认的HostThreadTable。用此作为默认构造器。 /// </summary> /// <param name="autoInit"></param> public AsyncServiceDispatcher(bool autoInit) { _table = new HostThreadTable(true); NewHostThreadProvider = new TaskHostThreadProvider(); } public AsyncServiceDispatcher(IHostThreadTable table, ITaskHostThreadProvider provider) { _table = table; NewHostThreadProvider = provider; } #endregion
#region IAsyncServiceDispatcher 成员
public IHostThreadTable Table { get { return _table; } private set//设置一个私有set访问器,为使用DI提供便捷。 { this._table = value; } }
public void RegisterTask(ITaskWrapper task) { RegisterTask(DEFAULT_THREADHOSTIDENTITY, task); }
public void RegisterTask(string taskHostIdentity, ITaskWrapper task) { RegisterTask(taskHostIdentity, task, null); }
public void RegisterTask(string taskHostIdentity, ITaskWrapper task, ITaskPrincipal principal) { //判断taskHostIdentity指定的宿主是否存在,如果不存在先用Provider生成一个。 if (!_table.QueryExist(taskHostIdentity)) { var threadHost = NewHostThreadProvider.CreateTaskHostThread(); _table[taskHostIdentity] = threadHost; } //将任务加入到队列中。 _table[taskHostIdentity].AddTask(task); }
public ITaskHostThreadProvider NewHostThreadProvider { get; set; } #endregion
#region IAsyncServiceDispatcher 成员
public void PauseTask(string hostThreadIdentity, string taskIdentity) { if(_table.QueryExist(hostThreadIdentity)) return; _table[hostThreadIdentity].PauseTask(taskIdentity); }
public void TerminateTask(string hostThreadIdentity, string taskIdentity) { if(_table.QueryExist(hostThreadIdentity)) return; _table[hostThreadIdentity].RemoveTask(taskIdentity); }
public void ResumeTask(string hostThreadIdentity, string taskIdentity) { if(_table.QueryExist(hostThreadIdentity)) return; _table[hostThreadIdentity].ResumeTask(taskIdentity); }
public void PauseThreadHost(string hostThreadIdentity) { if (!_table.QueryExist(hostThreadIdentity)) return; _table[hostThreadIdentity].ExpectedState = TaskState.Pause; }
public void TerminateThreadHost(string hostThreadIdentity) { if (!_table.QueryExist(hostThreadIdentity)) return; _table[hostThreadIdentity].TryExitThread(); }
public void ResumeThreadHost(string hostThreadIdentity) { if (!_table.QueryExist(hostThreadIdentity)) return; _table[hostThreadIdentity].ResumeCurrent(); }
public void TryTerminateAllThreadHost() { var keys = _table.Keys; foreach(string key in keys) { _table[key].TryExitThread(); } } #endregion } }
namespace Sopaco.Lib.ServiceModel.AsyncServiceDispatcher { public class HostThreadTable : IHostThreadTable { #region Fields private IDictionary<string, ITaskHostThread> _table; #endregion
#region Constructors & Initializer protected HostThreadTable()//为DI提供,不要放入任何代码。 {
} /// <summary> /// Sopaco设计契约,为了使IoC正确的注入属性,使用此构造器作为普通的手动初始化构造器。 /// </summary> /// <param name="autoCreateContainer"></param> public HostThreadTable(bool autoCreateContainer) { _table = new Dictionary<string, ITaskHostThread>(); } #endregion
#region IHostThreadTable 成员
public bool QueryExist(string name) { return _table.ContainsKey(name); }
public IEnumerable<string> Keys { get { return _table.Keys; } }
public ITaskHostThread this[string name] { get { if(!queryExist(name)) { return null; } else { return _table[name]; } } set { if (queryExist(name)) { throw new InvalidOperationException("已经存在相同命名的TaskHostThread:" + name); } _table[name] = value; } } #endregion
#region private Helper Methods private bool queryExist(string name)//QueryExist方法是对外提供的契约,queryExist是对内提供的辅助,尽管目前实现相同,但应该分开为好。 { return QueryExist(name); } #endregion } }
namespace Sopaco.Lib.ServiceModel.AsyncServiceDispatcher { public class TaskHostThread : ITaskHostThread { #region Fields private Thread _thread; private List<ITaskWrapper> _task; private TaskState _currentState = TaskState.Ready; private AutoResetEvent _waitTaskEvent; private AutoResetEvent _waitResumeEvent; private object _syncRoot = new object(); #endregion
#region Constructors & Initializer protected TaskHostThread() {
} public TaskHostThread(bool autoInit) : this(TaskState.Running) { } /// <summary> /// /// </summary> /// <param name="state">可以通过此参数控制是否立即执行任务</param> public TaskHostThread(TaskState state) { ExpectedState = state; Init(); if (state == TaskState.Running) StartCurrent(); } private void Init() { _waitTaskEvent = new AutoResetEvent(true); _waitResumeEvent = new AutoResetEvent(true); _thread = new Thread(this.runBody); _task = new List<ITaskWrapper>(); } #endregion #region ITaskHostThread 成员 public TaskState CurrentState { get { return _currentState; } }
public TaskState ExpectedState { get; set; }
public void TryExitThread() { ExpectedState = TaskState.Abort; //释放所有事件 _waitResumeEvent.Set(); _waitTaskEvent.Set(); }
public void StartCurrent() { if (_thread.ThreadState == ThreadState.Unstarted) _thread.Start(); }
public void PauseCurrentAndNext() { throw new NotImplementedException("此版本无法对任务进行细力度的控制"); }
public void PauseCurrent() { ExpectedState = TaskState.Pause; }
public void ResumeCurrent() { _waitResumeEvent.Set(); ExpectedState = TaskState.Running; }
public void TerminateCurrentAndNext() { throw new NotImplementedException("此版本无法终止正在任务"); }
public void TerminateCurrent() { throw new NotImplementedException("此版本无法终止正在任务"); }
public void TerminateAll() { foreach(var tk in _task) { tk.ExpectedState = TaskState.Abort; } }
public void AddTask(ITaskWrapper task) { lock (_syncRoot) { if (_task.Exists(p => p.Identify.Equals(task.Identify))) throw new InvalidOperationException("任务已经存在:" + task.Identify); _task.Add(task); if(_task.Count == 1) _waitTaskEvent.Set(); } }
public void AddTaskAndRefresh(ITaskWrapper task) { lock(_syncRoot) { _task.Add(task); _waitTaskEvent.Set(); resortTask(); } }
public void RemoveTask(string identity) { ITaskWrapper task = _task.Where(p => p.Identify.Equals(identity)).FirstOrDefault(); if(task != null) { _task.Remove(task); } }
public void PauseTask(string identity) { var task = _task.Where(p => p.Identify == identity).FirstOrDefault(); if (task == null) return; task.ExpectedState = TaskState.Pause; } public void ResumeTask(string identity) { var task = _task.Where(p => p.Identify == identity).FirstOrDefault(); if (task == null) return; task.ExpectedState = TaskState.Running; } /// <summary> /// 返回当前运行的任务,如果没有则返回null /// </summary> public ITaskWrapper RunningTask { get { return _task.Where(p => p.State == TaskState.Running).First(); } }
public IList<ITaskWrapper> PausingTask { get { return _task.Where( p => p.State == TaskState.Pause).ToList(); } }
#endregion
#region private Helper Methods private void runBody() { _currentState = TaskState.Ready; while(ExpectedState != TaskState.Complete) { //响应暂停命令 if(ExpectedState == TaskState.Abort) { waitUntilResume(); _currentState = TaskState.Running; } if(ExpectedState == TaskState.Abort) break; //如果没有任务进入等待状态,当队列中加入任务时激活信号量 if(this._task.Count == 0) { waitUntilHasTask(); _currentState = TaskState.Running; } if (ExpectedState == TaskState.Abort) break; //如果存在任务则取出并执行。 ITaskWrapper task = null; lock(_syncRoot) { if (_task.Count != 0) { task = dequeueTask(); if (task.State == TaskState.Pause) { enqueueTask(task); task = null; } } } if (ExpectedState == TaskState.Abort) break; if (task != null) task.Call();//执行 } _currentState = TaskState.Complete; } private void waitUntilHasTask() { _currentState = TaskState.Free;//线程进入Free状态 _waitTaskEvent.Reset(); _waitTaskEvent.WaitOne(); } private void waitUntilResume() { _currentState = TaskState.Pause; _waitResumeEvent.Reset(); _waitResumeEvent.WaitOne(); } private ITaskWrapper dequeueTask() { var task = _task.First(); _task.Remove(task); return task; } private void enqueueTask(ITaskWrapper task) { _task.Add(task); } private void resortTask() { _task.Sort((p, q) => { if (p.State == TaskState.Pause) return -1; else if (q.State == TaskState.Pause) return 1; else return q.Priority - p.Priority; }); } #endregion } }
public class TaskHostThreadProvider : ITaskHostThreadProvider { #region ITaskHostThreadProvider 成员
public ITaskHostThread CreateTaskHostThread() { return new TaskHostThread(true); }
#endregion }
public enum TaskResult { OK }
public enum TaskState { Ready, Running, Pause, Abort, Complete, Free }
public class TaskWrapperBase : ITaskWrapper { #region Fields private static readonly string DEFAULT_TASKIDENTITY = "Sopaco.Lib.ServiceModel.AsyncServiceDispatcher.TaskWrapperBaseIdentity"; private ITask _innerTask; #endregion
#region Constructors & Initializer private TaskWrapperBase() {
} public TaskWrapperBase(bool autoInit, ITask innerTask) : this(innerTask, DEFAULT_TASKIDENTITY, 0, TaskState.Ready) { } public TaskWrapperBase(ITask innerTask, string identity, int priority, TaskState state) { _innerTask = innerTask; this.Identify = identity; this.Priority = priority; this.State = state; } #endregion
#region ITaskWrapper 成员
public ITask InnerTask { get { return _innerTask; } }
public TaskState State { get; set; }
public TaskState ExpectedState { get; set; }
public int Priority { get; set; }
public string Identify { get; set; }
#endregion
#region ITask 成员
public TaskResult Call() { _innerTask.Call(); return TaskResult.OK; }
#endregion }
|