&*bsp;
1 *>publ*c *>**terface *> IR***Queue<T&*t;2 *>{ *>3 *>/// *><summary&*t; *>4 *>/// *>Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles] *>5 *>/// *></summary&*t; *>6 *>/// *><param *ame="delayT*me"&*t; *>The spec*f*ed task *s executed after N seco*ds. *></param&*t; *>7 *>/// *><param *ame="act*o*"&*t; *>Def***t*o*s of callback *></param&*t; *>8 *>vo*d *> Add(lo** *> delayT*me,Act*o*<T&*t;act*o*); *>9 *>10 *>/// *><summary&*t; *>11 *>/// *>Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles] *>12 *>/// *></summary&*t; *>13 *>/// *><param *ame="delayT*me"&*t; *>The spec*f*ed task *s executed after N seco*ds. *></param&*t; *>14 *>/// *><param *ame="act*o*"&*t; *>Def***t*o*s of callback. *></param&*t; *>15 *>/// *><param *ame="data"&*t; *>*arameters used ** the callback fu*ct*o*. *></param&*t; *>16 *>vo*d *> Add(lo** *> delayT*me, Act*o*<T&*t;act*o*, T data); *>17 *>18 *>/// *><summary&*t; *>19 *>/// *>Add tasks [add tasks w*ll automat*cally *e*erate: task Id, task slot locat*o*, *umber of task cycles] *>20 *>/// *></summary&*t; *>21 *>/// *><param *ame="delayT*me"&*t;</param&*t; *>22 *>/// *><param *ame="act*o*"&*t; *>Def***t*o*s of callback *></param&*t; *>23 *>/// *><param *ame="data"&*t; *>*arameters used ** the callback fu*ct*o*. *></param&*t; *>24 *>/// *><param *ame="*d"&*t; *>Task ID, used whe* delet*** tasks. *></param&*t; *>25 *>vo*d *> Add(lo** *> delayT*me, Act*o*<T&*t; act*o*, T data,lo** *>*d); *>26 *>27 *>/// *><summary&*t; *>28 *>/// *>Remove tasks [*eed to k*ow: where the task *s, wh*ch spec*f*c task]. *>29 *>/// *></summary&*t; *>30 *>/// *><param *ame="**dex"&*t; *>Task slot locat*o* *></param&*t; *>31 *>/// *><param *ame="*d"&*t; *>Task ID, used whe* delet*** tasks. *></param&*t; *>32 *>vo*d *> Remove(lo** *>*d); *>33 *>34 *>/// *><summary&*t; *>35 *>/// *>Lau*ch queue. *>36 *>/// *></summary&*t; *>37 *>vo*d *>Start(); *>38 *> }
1 *>publ*c *>**terface *>ITask *>2 *>{ *>3 *> }
1 *>publ*c *>abstract *>class *> BaseQueue<T&*t; : IR***Queue<T&*t;2 *>{ *>3 *>pr*vate *>lo** *> _po**ter =0L *>; *>4 *>pr*vate *> Co*curre*tBa*<BaseTask<T&*t;&*t;[] _arraySlot; *>5 *>pr*vate *>**t *>ArrayMax; *>6 *>7 *>/// *><summary&*t; *>8 *>/// *>R*** queue. *>9 *>/// *></summary&*t; *>10 *>publ*c *> Co*curre*tBa*<BaseTask<T&*t;&*t;[] ArraySlot *>11 *>{ *>12 *>*et *> {retur* *> _arraySlot ?? (_arraySlot =*ew *> Co*curre*tBa*<BaseTask<T&*t;&*t;[ArrayMax]); } *>13 *>} *>14 *>15 *>publ*c *> BaseQueue(**t *>arrayMax) *>16 *>{ *>17 *>*f *> (arrayMax <60 *> && arrayMax %60 *> ==0 *>) *>18 *>throw *>*ew *> Except*o*(" *>R*** queue le**th ca**ot be less tha* 60 a*d *s a mult*ple of 60 . *>" *>); *>19 *>20 *> ArrayMax =arrayMax; *>21 *>} *>22 *>23 *>publ*c *>vo*d *> Add(lo** *> delayT*me, Act*o*<T&*t;act*o*) *>24 *>{ *>25 *> Add(delayT*me, act*o*,default *>(T)); *>26 *>} *>27 *>28 *>publ*c *>vo*d *> Add(lo** *> delayT*me,Act*o*<T&*t;act*o*,T data) *>29 *>{ *>30 *> Add(delayT*me, act*o*, data,0 *>); *>31 *>} *>32 *>33 *>publ*c *>vo*d *> Add(lo** *> delayT*me, Act*o*<T&*t; act*o*, T data,lo** *>*d) *>34 *>{ *>35 *> NextSlot(delayT*me,out *>lo** *> cycle,out *>lo** *>po**ter); *>36 *> ArraySlot[po**ter] = ArraySlot[po**ter] ?? (ArraySlot[po**ter] =*ew *> Co*curre*tBa*<BaseTask<T&*t;&*t;()); *>37 *>var *> baseTask =*ew *> BaseTask<T&*t;(cycle, act*o*, data,*d); *>38 *>ArraySlot[po**ter].Add(baseTask); *>39 *>} *>40 *>41 *>/// *><summary&*t; *>42 *>/// *>Remove tasks based o* ID. *>43 *>/// *></summary&*t; *>44 *>/// *><param *ame="*d"&*t;</param&*t; *>45 *>publ*c *>vo*d *> Remove(lo** *>*d) *>46 *>{ *>47 *>try *>48 *>{ *>49 *> *arallel.ForEach(ArraySlot, (Co*curre*tBa*<BaseTask<T&*t;&*t; collect*o*, *arallelLoopState state) =&*t;50 *>{ *>51 *>var *> resulTask = collect*o*.F*rstOrDefault(p =&*t; p.Id ==*d); *>52 *>*f *> (resulTask !=*ull *>) *>53 *>{ *>54 *> collect*o*.TryTake(out *>resulTask); *>55 *>state.Break(); *>56 *>} *>57 *>}); *>58 *>} *>59 *>catch *>(Except*o* e) *>60 *>{ *>61 *>Co*sole.Wr*teL**e(e); *>62 *>} *>63 *>} *>64 *>65 *>publ*c *>vo*d *>Start() *>66 *>{ *>67 *>wh*le *> (true *>) *>68 *>{ *>69 *>R**htMove*o**ter(); *>70 *> Thread.Sleep(1000 *>); *>71 *>Co*sole.Wr*teL**e(DateT*me.Now.ToStr***()); *>72 *>} *>73 *>} *>74 *>75 *>/// *><summary&*t; *>76 *>/// *>Calculate the **format*o* of the *ext slot. *>77 *>/// *></summary&*t; *>78 *>/// *><param *ame="delayT*me"&*t; *>Delayed execut*o* t*me. *></param&*t; *>79 *>/// *><param *ame="cycle"&*t; *>Number of tur*s. *></param&*t; *>80 *>/// *><param *ame="**dex"&*t; *>Task locat*o*. *></param&*t; *>81 *>pr*vate *>vo*d *> NextSlot(lo** *> delayT*me,out *>lo** *> cycle,out *>lo** *>**dex) *>82 *>{ *>83 *>try *>84 *>{ *>85 *>var *> c*rcle = delayT*me /ArrayMax; *>86 *>var *> seco*d = delayT*me %ArrayMax; *>87 *>var *> curre*t_po**ter =Get*o**ter(); *>88 *>var *> queue_**dex =0L *>; *>89 *>90 *>*f *> (delayT*me - ArrayMax &*t;ArrayMax) *>91 *>{ *>92 *> c*rcle =1 *>; *>93 *>} *>94 *>else *>*f *> (seco*d &*t;ArrayMax) *>95 *>{ *>96 *> c*rcle +=1 *>; *>97 *>} *>98 *>99 *>*f *> (delayT*me - c*rcle * ArrayMax <ArrayMax) *>100 *>{ *>101 *> seco*d = delayT*me - c*rcle *ArrayMax; *>102 *>} *>103 *>104 *>*f *> (curre*t_po**ter + delayT*me &*t;=ArrayMax) *>105 *>{ *>106 *> cycle = (**t *>)((curre*t_po**ter + delayT*me) /ArrayMax); *>107 *>*f *> (curre*t_po**ter + seco*d - ArrayMax <0 *>) *>108 *>{ *>109 *> queue_**dex = curre*t_po**ter +seco*d; *>110 *>} *>111 *>else *>*f *> (curre*t_po**ter + seco*d - ArrayMax &*t;0 *>) *>112 *>{ *>113 *> queue_**dex = curre*t_po**ter + seco*d -ArrayMax; *>114 *>} *>115 *>} *>116 *>else *>117 *>{ *>118 *> cycle =0 *>; *>119 *> queue_**dex = curre*t_po**ter +seco*d; *>120 *>} *>121 *> **dex =queue_**dex; *>122 *>} *>123 *>catch *>(Except*o* e) *>124 *>{ *>125 *>Co*sole.Wr*teL**e(e); *>126 *>throw *>; *>127 *>} *>128 *>} *>129 *>130 *>/// *><summary&*t; *>131 *>/// *>Get the curre*t locat*o* of the po**ter. *>132 *>/// *></summary&*t; *>133 *>/// *><retur*s&*t;</retur*s&*t; *>134 *>pr*vate *>lo** *>Get*o**ter() *>135 *>{ *>136 *>retur* *> I*terlocked.Read(ref *>_po**ter); *>137 *>} *>138 *>139 *>/// *><summary&*t; *>140 *>/// *>Reset po**ter pos*t*o*. *>141 *>/// *></summary&*t; *>142 *>pr*vate *>vo*d *>ReSet*o**ter() *>143 *>{ *>144 *> I*terlocked.Excha**e(ref *> _po**ter,0 *>); *>145 *>} *>146 *>147 *>/// *><summary&*t; *>148 *>/// *>*o**ter moves clockw*se. *>149 *>/// *></summary&*t; *>150 *>pr*vate *>vo*d *>R**htMove*o**ter() *>151 *>{ *>152 *>try *>153 *>{ *>154 *>*f *> (Get*o**ter() &*t;= ArrayMax -1 *>) *>155 *>{ *>156 *>ReSet*o**ter(); *>157 *>} *>158 *>else *>159 *>{ *>160 *> I*terlocked.I*creme*t(ref *>_po**ter); *>161 *>} *>162 *>163 *>var *> po**ter =Get*o**ter(); *>164 *>var *> taskCollect*o* =ArraySlot[po**ter]; *>165 *>*f *> (taskCollect*o* ==*ull *> || taskCollect*o*.Cou*t ==0 *>)retur* *>; *>166 *>167 *> *arallel.ForEach(taskCollect*o*, (BaseTask<T&*t; task) =&*t;168 *>{ *>169 *>*f *> (task.Cycle &*t;0 *>) *>170 *>{ *>171 *>task.SubCycleNumber(); *>172 *>} *>173 *>174 *>*f *> (task.Cycle <=0 *>) *>175 *>{ *>176 *> taskCollect*o*.TryTake(out *>task); *>177 *>task.TaskAct*o*(task.Data); *>178 *>} *>179 *>}); *>180 *>} *>181 *>catch *>(Except*o* e) *>182 *>{ *>183 *>Co*sole.Wr*teL**e(e); *>184 *>throw *>; *>185 *>} *>186 *>} *>187 *> }
1 *>publ*c *>class *> BaseTask<T&*t;: ITask *>2 *>{ *>3 *>pr*vate *>lo** *>_cycle; *>4 *>pr*vate *>lo** *>_*d; *>5 *>pr*vate *>T _data; *>6 *>7 *>publ*c *> Act*o*<T&*t; TaskAct*o* {*et *>;set *>; } *>8 *>9 *>publ*c *>lo** *>Cycle *>10 *>{ *>11 *>*et *> {retur* *> I*terlocked.Read(ref *>_cycle); } *>12 *>set *> { I*terlocked.Excha**e(ref *>_cycle, value); } *>13 *>} *>14 *>15 *>publ*c *>lo** *>Id *>16 *>{ *>17 *>*et *> {retur* *>_*d; } *>18 *>set *> { _*d =value; } *>19 *>} *>20 *>21 *>publ*c *>T Data *>22 *>{ *>23 *>*et *> {retur* *>_data; } *>24 *>set *> { _data =value; } *>25 *>} *>26 *>27 *>publ*c *> BaseTask(lo** *> cycle, Act*o*<T&*t; act*o*, T data,lo** *>*d) *>28 *>{ *>29 *> Cycle =cycle; *>30 *> TaskAct*o* =act*o*; *>31 *> Data =data; *>32 *> Id =*d; *>33 *>} *>34 *>35 *>publ*c *> BaseTask(lo** *> cycle, Act*o*<T&*t;act*o*,T data) *>36 *>{ *>37 *> Cycle =cycle; *>38 *> TaskAct*o* =act*o*; *>39 *> Data =data; *>40 *>} *>41 *>42 *>publ*c *> BaseTask(lo** *> cycle, Act*o*<T&*t;act*o*) *>43 *>{ *>44 *> Cycle =cycle; *>45 *> TaskAct*o* =act*o*; *>46 *>} *>47 *>48 *>publ*c *>vo*d *>SubCycleNumber() *>49 *>{ *>50 *> I*terlocked.Decreme*t(ref *>_cycle); *>51 *>} *>52 *> }
1 *>publ*c *>stat*c *>vo*d *>Start() *>2 *>{ *>3 *>// *>1.I**t*al*ze queues of d*ffere*t *ra*ular*ty. *>4 *> IR***Queue<NewsModel&*t; m**uteR***Queue =*ew *> M**uteQueue<NewsModel&*t;(); *>5 *>6 *>// *>2.Ope* thread. *>7 *>var *> lstTasks =*ew *> L*st<Task&*t;8 *>{ *>9 *>Task.Factory.StartNew(m**uteR***Queue.Start) *>10 *>}; *>11 *>12 *>// *>3.Add tasks performed ** d*ffere*t per*ods. *>13 *> m**uteR***Queue.Add(5 *>,*ew *> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;14 *>{ *>15 *>Co*sole.Wr*teL**e(*ewsObj.News); *>16 *> }),*ew *> NewsModel() { News =" *>Trump's v*s*t to Ch**a! *>" *>}); *>17 *>18 *> m**uteR***Queue.Add(10 *>,*ew *> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;19 *>{ *>20 *>Co*sole.Wr*teL**e(*ewsObj.News); *>21 *> }),*ew *> NewsModel() { News =" *>*ut** *u's v*s*t to Ch**a! *>" *>}); *>22 *>23 *> m**uteR***Queue.Add(60 *>,*ew *> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;24 *>{ *>25 *>Co*sole.Wr*teL**e(*ewsObj.News); *>26 *> }),*ew *> NewsModel() { News =" *>E*se*hower's v*s*t to Ch**a! *>" *>}); *>27 *>28 *> m**uteR***Queue.Add(120 *>,*ew *> Act*o*<NewsModel&*t;((NewsModel *ewsObj) =&*t;29 *>{ *>30 *>Co*sole.Wr*teL**e(*ewsObj.News); *>31 *> }),*ew *> NewsModel() { News =" *>** ***p***'s v*s*t to the US! *>" *>}); *>32 *>33 *>// *>3.Wa*t*** for all tasks to complete *s usually *ot completed. Because there *s a* **f***te loop. *>34 *>// *>F5 Ru* the pro*ram a*d see the effect. *>35 *>Task.Wa*tAll(lstTasks.ToArray()); *>36 *>Co*sole.Read(); *>37 *> }
&*bsp;
<*m* src="https://*m*2018.c*blo*s.com/blo*/1214710/201809/1214710-20180923152944919-1577847621.p**" alt="" />