交流*>

    **>个人博客交流群:580749909 , 顺便推广一下自己和伙伴一起建立wpf交流群:130108655。

    **>
    **>
    *> 简要**>*>*>

&*bsp;因为在偶然的一次机会下,公司让我着手开发一个数据分发端基于socket通讯的一个中间件。主要用来解决向客户端分发数据的问题,后来多了一个需求就是未付费的用户拿到的数据是有延迟的。*>

而付费用户则是正常的。这个时候在网上搜了很久没有找到合适的解决方案,其实能解决这个问题的方案有很多比如说用到一些大厂贡献的xxMQ中间件之类的,确实能解决问题。但是目前项目比较小*>

根本用不上这么重的框架,然后又搜索了半天没有暂时没有发现有人用c#来实现,所以才动手写了这个方案。*>

附上**thub源码地址*>

    思路**>*>*>

这个方案是借鉴了另一位博主的开发思路,受到这位博主的启发然后根据自己的理解写了这个方案。附上该博主的链接地址:&*bsp;&*bsp;*>1分钟实现“延迟消息”功能**>*>

在此我就不多赘述里面的内容了。*>

    代码**>*>*>

首先写一个方案要理清楚自己的项目结构,我做了如下分层。*>*>

<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923151631092-669553263.p**" alt="" />*>*>

&*bsp;

    I*terfaces , 这层里主要约束延迟消息队列的队列和消息任务行。*>
 1*>   publ*c*> **terface*> IR***Queue<T&*t;
 2*>     {
*> 3*>         ///*> <summary&*t;*>
 4*>         ///*> Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
*> 5*>         ///*> </summary&*t;*>
 6*>         ///*> <param *ame="delayT*me"&*t;*>The spec*f*ed task *s executed after N seco*ds.*></param&*t;*>
 7*>         ///*> <param *ame="act*o*"&*t;*>Def***t*o*s of callback*></param&*t;*>
 8*>         vo*d*> Add(lo***> delayT*me,Act*o*<T&*t; act*o*);
*> 9*> 
10*>         ///*> <summary&*t;*>
11*>         ///*> Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
*>12*>         ///*> </summary&*t;*>
13*>         ///*> <param *ame="delayT*me"&*t;*>The spec*f*ed task *s executed after N seco*ds.*></param&*t;*>
14*>         ///*> <param *ame="act*o*"&*t;*>Def***t*o*s of callback.*></param&*t;*>
15*>         ///*> <param *ame="data"&*t;*>*arameters used ** the callback fu*ct*o*.*></param&*t;*>
16*>         vo*d*> Add(lo***> delayT*me, Act*o*<T&*t; act*o*, T data);
*>17*> 
18*>         ///*> <summary&*t;*>
19*>         ///*> Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles]
*>20*>         ///*> </summary&*t;*>
21*>         ///*> <param *ame="delayT*me"&*t;</param&*t;*>
22*>         ///*> <param *ame="act*o*"&*t;*>Def***t*o*s of callback*></param&*t;*>
23*>         ///*> <param *ame="data"&*t;*>*arameters used ** the callback fu*ct*o*.*></param&*t;*>
24*>         ///*> <param *ame="*d"&*t;*>Task ID, used whe* delet*** tasks.*></param&*t;*>
25*>         vo*d*> Add(lo***> delayT*me, Act*o*<T&*t; act*o*, T data, lo***> *d);
*>26*> 
27*>         ///*> <summary&*t;*>
28*>         ///*> Remove tasks [*eed to k*ow: where the task *s, wh*ch spec*f*c task].
*>29*>         ///*> </summary&*t;*>
30*>         ///*> <param *ame="**dex"&*t;*>Task slot locat*o**></param&*t;*>
31*>         ///*> <param *ame="*d"&*t;*>Task ID, used whe* delet*** tasks.*></param&*t;*>
32*>         vo*d*> Remove(lo***> *d);
*>33*> 
34*>         ///*> <summary&*t;*>
35*>         ///*> Lau*ch queue.
*>36*>         ///*> </summary&*t;*>
37*>         vo*d*> Start();
*>38*>     }
*v>
1*> publ*c*> **terface*> ITask
*>2*>     {
*>3*>     }
*v>
    Ach*eves,这层里实现之前定义的接口,这里写成抽象类是为了后面方便扩展。*>
<*m* *d="code_*m*_closed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_41bba8a5-767e-44e1-adae-d0d820aa4313" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('41bba8a5-767e-44e1-adae-d0d820aa4313',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />
  1*>    publ*c*> abstract*> class*> BaseQueue<T&*t; : IR***Queue<T&*t;
  2*>     {
*>  3*>         pr*vate*> lo***> _po**ter = 0L*>;
*>  4*>         pr*vate*> Co*curre*tBa*<BaseTask<T&*t;&*t;[] _arraySlot;
*>  5*>         pr*vate*> **t*> ArrayMax;
*>  6*> 
  7*>         ///*> <summary&*t;*>
  8*>         ///*> R*** queue.
*>  9*>         ///*> </summary&*t;*>
 10*>         publ*c*> Co*curre*tBa*<BaseTask<T&*t;&*t;[] ArraySlot
*> 11*>         {
*> 12*>             *et*> { retur**> _arraySlot ?? (_arraySlot = *ew*> Co*curre*tBa*<BaseTask<T&*t;&*t;[ArrayMax]); }
*> 13*>         }
*> 14*>         
 15*>         publ*c*> BaseQueue(**t*> arrayMax)
*> 16*>         {
*> 17*>             *f*> (arrayMax < 60*> && arrayMax % 60*> == 0*>)
*> 18*>                 throw*> *ew*> Except*o*("*>R*** queue le**th ca**ot be less tha* 60 a*d *s a mult*ple of 60 .*>"*>);
*> 19*> 
 20*>             ArrayMax = arrayMax;
*> 21*>         }
*> 22*> 
 23*>         publ*c*> vo*d*> Add(lo***> delayT*me, Act*o*<T&*t; act*o*)
*> 24*>         {
*> 25*>             Add(delayT*me, act*o*, default*>(T));
*> 26*>         }
*> 27*> 
 28*>         publ*c*> vo*d*> Add(lo***> delayT*me,Act*o*<T&*t; act*o*,T data)
*> 29*>         {
*> 30*>             Add(delayT*me, act*o*, data,0*>);
*> 31*>         }
*> 32*> 
 33*>         publ*c*> vo*d*> Add(lo***> delayT*me, Act*o*<T&*t; act*o*, T data,lo***> *d)
*> 34*>         {
*> 35*>             NextSlot(delayT*me, out*> lo***> cycle, out*> lo***> po**ter);
*> 36*>             ArraySlot[po**ter] =  ArraySlot[po**ter] ?? (ArraySlot[po**ter] = *ew*> Co*curre*tBa*<BaseTask<T&*t;&*t;());
*> 37*>             var*> baseTask = *ew*> BaseTask<T&*t;(cycle, act*o*, data,*d);
*> 38*>             ArraySlot[po**ter].Add(baseTask);
*> 39*>         }
*> 40*> 
 41*>         ///*> <summary&*t;*>
 42*>         ///*> Remove tasks based o* ID.
*> 43*>         ///*> </summary&*t;*>
 44*>         ///*> <param *ame="*d"&*t;</param&*t;*>
 45*>         publ*c*> vo*d*> Remove(lo***> *d)
*> 46*>         {
*> 47*>             try*>
 48*>             {
*> 49*>                 *arallel.ForEach(ArraySlot, (Co*curre*tBa*<BaseTask<T&*t;&*t; collect*o*, *arallelLoopState state) =&*t;
 50*>                 {
*> 51*>                     var*> resulTask = collect*o*.F*rstOrDefault(p =&*t; p.Id == *d);
*> 52*>                     *f*> (resulTask != *ull*>)
*> 53*>                     {
*> 54*>                         collect*o*.TryTake(out*> resulTask);
*> 55*>                         state.Break();
*> 56*>                     }
*> 57*>                 });
*> 58*>             }
*> 59*>             catch*> (Except*o* e)
*> 60*>             {
*> 61*>                 Co*sole.Wr*teL**e(e);
*> 62*>             }
*> 63*>         }
*> 64*>         
 65*>         publ*c*> vo*d*> Start()
*> 66*>         {
*> 67*>             wh*le*> (true*>)
*> 68*>             {
*> 69*>                 R**htMove*o**ter();
*> 70*>                 Thread.Sleep(1000*>);
*> 71*>                 Co*sole.Wr*teL**e(DateT*me.Now.ToStr***());
*> 72*>             }
*> 73*>         }
*> 74*> 
 75*>         ///*> <summary&*t;*>
 76*>         ///*> Calculate the **format*o* of the *ext slot.
*> 77*>         ///*> </summary&*t;*>
 78*>         ///*> <param *ame="delayT*me"&*t;*>Delayed execut*o* t*me.*></param&*t;*>
 79*>         ///*> <param *ame="cycle"&*t;*>Number of tur*s.*></param&*t;*>
 80*>         ///*> <param *ame="**dex"&*t;*>Task locat*o*.*></param&*t;*>
 81*>         pr*vate*> vo*d*> NextSlot(lo***> delayT*me, out*> lo***> cycle,out*> lo***> **dex)
*> 82*>         {
*> 83*>             try*>
 84*>             {
*> 85*>                 var*> c*rcle = delayT*me / ArrayMax;
*> 86*>                 var*> seco*d = delayT*me % ArrayMax;
*> 87*>                 var*> curre*t_po**ter = Get*o**ter();
*> 88*>                 var*> queue_**dex = 0L*>;
*> 89*> 
 90*>                 *f*> (delayT*me - ArrayMax &*t; ArrayMax)
*> 91*>                 {
*> 92*>                     c*rcle = 1*>;
*> 93*>                 }
*> 94*>                 else*> *f*> (seco*d &*t; ArrayMax)
*> 95*>                 {
*> 96*>                     c*rcle += 1*>;
*> 97*>                 }
*> 98*> 
 99*>                 *f*> (delayT*me - c*rcle * ArrayMax < ArrayMax)
*>100*>                 {
*>101*>                     seco*d = delayT*me - c*rcle * ArrayMax;
*>102*>                 }
*>103*> 
104*>                 *f*> (curre*t_po**ter + delayT*me &*t;= ArrayMax)
*>105*>                 {
*>106*>                     cycle = (**t*>)((curre*t_po**ter + delayT*me) / ArrayMax);
*>107*>                     *f*> (curre*t_po**ter + seco*d - ArrayMax < 0*>)
*>108*>                     {
*>109*>                         queue_**dex = curre*t_po**ter + seco*d;
*>110*>                     }
*>111*>                     else*> *f*> (curre*t_po**ter + seco*d - ArrayMax &*t; 0*>)
*>112*>                     {
*>113*>                         queue_**dex = curre*t_po**ter + seco*d - ArrayMax;
*>114*>                     }
*>115*>                 }
*>116*>                 else*>
117*>                 {
*>118*>                     cycle = 0*>;
*>119*>                     queue_**dex = curre*t_po**ter + seco*d;
*>120*>                 }
*>121*>                 **dex = queue_**dex;
*>122*>             }
*>123*>             catch*> (Except*o* e)
*>124*>             {
*>125*>                 Co*sole.Wr*teL**e(e);
*>126*>                 throw*>;
*>127*>             }
*>128*>         }
*>129*> 
130*>         ///*> <summary&*t;*>
131*>         ///*> Get the curre*t locat*o* of the po**ter.
*>132*>         ///*> </summary&*t;*>
133*>         ///*> <retur*s&*t;</retur*s&*t;*>
134*>         pr*vate*> lo***> Get*o**ter()
*>135*>         {
*>136*>             retur**> I*terlocked.Read(ref*> _po**ter);
*>137*>         }
*>138*> 
139*>         ///*> <summary&*t;*>
140*>         ///*> Reset po**ter pos*t*o*.
*>141*>         ///*> </summary&*t;*>
142*>         pr*vate*> vo*d*> ReSet*o**ter()
*>143*>         {
*>144*>             I*terlocked.Excha**e(ref*> _po**ter, 0*>);
*>145*>         }
*>146*> 
147*>         ///*> <summary&*t;*>
148*>         ///*> *o**ter moves clockw*se.
*>149*>         ///*> </summary&*t;*>
150*>         pr*vate*> vo*d*> R**htMove*o**ter()
*>151*>         {
*>152*>             try*>
153*>             {
*>154*>                 *f*> (Get*o**ter() &*t;= ArrayMax - 1*>)
*>155*>                 {
*>156*>                     ReSet*o**ter();
*>157*>                 }
*>158*>                 else*>
159*>                 {
*>160*>                     I*terlocked.I*creme*t(ref*> _po**ter);
*>161*>                 }
*>162*> 
163*>                 var*> po**ter = Get*o**ter();
*>164*>                 var*> taskCollect*o* = ArraySlot[po**ter];
*>165*>                 *f*> (taskCollect*o* == *ull*> || taskCollect*o*.Cou*t == 0*>) retur**>;
*>166*> 
167*>                 *arallel.ForEach(taskCollect*o*, (BaseTask<T&*t; task) =&*t;
168*>                 {
*>169*>                     *f*> (task.Cycle &*t; 0*>)
*>170*>                     {
*>171*>                         task.SubCycleNumber();
*>172*>                     }
*>173*> 
174*>                     *f*> (task.Cycle <= 0*>)
*>175*>                     {
*>176*>                         taskCollect*o*.TryTake(out*> task);
*>177*>                         task.TaskAct*o*(task.Data);
*>178*>                     }
*>179*>                 });
*>180*>             }
*>181*>             catch*> (Except*o* e)
*>182*>             {
*>183*>                 Co*sole.Wr*teL**e(e);
*>184*>                 throw*>;
*>185*>             }
*>186*>         }
*>187*>     }
*v> BaseQueue*>
*v> <*m* *d="code_*m*_closed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_closed" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Co*tractedBlock.**f" alt="" /><*m* *d="code_*m*_ope*ed_ff60babe-8bc1-4bc0-882c-88cb2a24207c" class="code_*m*_ope*ed" style="d*splay: *o*e;" o*cl*ck="c*blo*s_code_h*de('ff60babe-8bc1-4bc0-882c-88cb2a24207c',eve*t)" src="http://*ma*es.c*blo*s.com/Outl*****I*d*cators/Expa*dedBlockStart.**f" alt="" />
 1*>   publ*c*> class*> BaseTask<T&*t; : ITask
*> 2*>     {
*> 3*>         pr*vate*> lo***> _cycle;
*> 4*>         pr*vate*> lo***> _*d;
*> 5*>         pr*vate*> T _data;
*> 6*> 
 7*>         publ*c*> Act*o*<T&*t; TaskAct*o* { *et*>; set*>; }
*> 8*> 
 9*>         publ*c*> lo***> Cycle
*>10*>         {
*>11*>             *et*> { retur**> I*terlocked.Read(ref*> _cycle); }
*>12*>             set*> { I*terlocked.Excha**e(ref*> _cycle, value); }
*>13*>         }
*>14*> 
15*>         publ*c*> lo***> Id
*>16*>         {
*>17*>             *et*> { retur**> _*d; }
*>18*>             set*> { _*d = value; }
*>19*>         }
*>20*> 
21*>         publ*c*> T Data
*>22*>         {
*>23*>             *et*> { retur**> _data; }
*>24*>             set*> { _data = value; }
*>25*>         }
*>26*> 
27*>         publ*c*> BaseTask(lo***> cycle, Act*o*<T&*t; act*o*, T data,lo***> *d)
*>28*>         {
*>29*>             Cycle = cycle;
*>30*>             TaskAct*o* = act*o*;
*>31*>             Data = data;
*>32*>             Id = *d;
*>33*>         }
*>34*> 
35*>         publ*c*> BaseTask(lo***> cycle, Act*o*<T&*t; act*o*,T data)
*>36*>         {
*>37*>             Cycle = cycle;
*>38*>             TaskAct*o* = act*o*;
*>39*>             Data = data;
*>40*>         }
*>41*> 
42*>         publ*c*> BaseTask(lo***> cycle, Act*o*<T&*t; act*o*)
*>43*>         {
*>44*>             Cycle = cycle;
*>45*>             TaskAct*o* = act*o*;
*>46*>         }
*>47*>         
48*>         publ*c*> vo*d*> SubCycleNumber()
*>49*>         {
*>50*>             I*terlocked.Decreme*t(ref*> _cycle);
*>51*>         }
*>52*>     }
*v> BaseTask*>
*v>
    Lo**c,这层主要实现调用逻辑,调用者最终只需要关心把任务放进队列并指定什么时候执行就行了,根本不需要关心其它的任何信息。*>
 1*>  publ*c*> stat*c*> vo*d*> Start()
*> 2*>         {
*> 3*>             //*>1.I**t*al*ze queues of d*ffere*t *ra*ular*ty.*>
 4*>             IR***Queue<NewsModel&*t; m**uteR***Queue = *ew*> M**uteQueue<NewsModel&*t;();
*> 5*> 
 6*>             //*>2.Ope* thread.*>
 7*>             var*> lstTasks = *ew*> L*st<Task&*t;
 8*>             {
*> 9*>                 Task.Factory.StartNew(m**uteR***Queue.Start)
*>10*>             };
*>11*> 
12*>             //*>3.Add tasks performed ** d*ffere*t per*ods.*>
13*>             m**uteR***Queue.Add(5*>, *ew*> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
14*>             {
*>15*>                 Co*sole.Wr*teL**e(*ewsObj.News);
*>16*>             }), *ew*> NewsModel() { News = "*>Trump's v*s*t to Ch**a!*>"*> });
*>17*> 
18*>             m**uteR***Queue.Add(10*>, *ew*> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
19*>             {
*>20*>                 Co*sole.Wr*teL**e(*ewsObj.News);
*>21*>             }), *ew*> NewsModel() { News = "*>*ut** *u's v*s*t to Ch**a!*>"*> });
*>22*> 
23*>             m**uteR***Queue.Add(60*>, *ew*> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
24*>             {
*>25*>                 Co*sole.Wr*teL**e(*ewsObj.News);
*>26*>             }), *ew*> NewsModel() { News = "*>E*se*hower's v*s*t to Ch**a!*>"*> });
*>27*> 
28*>             m**uteR***Queue.Add(120*>, *ew*> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;
29*>             {
*>30*>                 Co*sole.Wr*teL**e(*ewsObj.News);
*>31*>             }), *ew*> NewsModel() { News = "*>** ***p***'s v*s*t to the US!*>"*> });
*>32*> 
33*>             //*>3.Wa*t*** for all tasks to complete *s usually *ot completed. Because there *s a* **f***te loop.
*>34*>             //*>F5 Ru* the pro*ram a*d see the effect.*>
35*>             Task.Wa*tAll(lstTasks.ToArray());
*>36*>             Co*sole.Read();
*>37*>         }
*v>
    Models,这层就是用来在延迟任务中带入的数据模型类而已了。自己用的时候换成任意自定义类型都可以。*>

&*bsp;

    截图**>*>*>

<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923152944919-1577847621.p**" alt="" />

分类:

技术点:

相关文章: