本文开始我们通过做几个简单的例子,来逐步认识CCR,看看她到底有多神奇。
一、 第一个神奇:不用任何与创建线程、资源互斥有关系的API写多线程程序
这次的例子,是一个很简单的控制台,她将面对瞬间提交的百万的数据,而面不改色(CPU、内存非常平稳),队列中始终只保存最新的数据,每次只处理cpu个数据(我的机器是双核的,所以,在我这里,就是每个CPU一个线程,真正的并行运行哦....),OK不废话,进入正题:
呃,既然是实例,那么就直接看代码好了:
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Microsoft.Ccr.Core;
namespace CCRDemo1
{
class Program
{
static void Main(string[] args)
{
int maxiQueueDepth = 10;
// step1: 创建一个Dispatcher对象
Dispatcher dispatcher = new Dispatcher(0, "调度器名称");
// step2: 创建一个与step1创建对象关联的DispatcherQueue对象
DispatcherQueue depthThrottledQueue = new DispatcherQueue(
"任务队列的名称",
// 关联到该队列的调度器
dispatcher,
// 队列保存数据的策略:保存最近消息策略
TaskExecutionPolicy.ConstrainQueueDepthDiscardTasks,
// 队列的深度
maxiQueueDepth
);
// step3: 创建一个能够接收整型数据的Port
Port<int> intPort = new Port<int>();
// step4: 把Port与处理函数关联,然后再与DispatcherQueue关联
Arbiter.Activate(depthThrottledQueue,
Arbiter.Receive(true,
intPort,
delegate(int i) // 这里用了一个匿名方法,作为处理函数
{
Thread.Sleep(2000);
Console.WriteLine("[{0}] {1}", DateTime.Now.ToString("o"), i);
}
)
);
// step5: 快速的提交大量的任务
Console.WriteLine("[{0}] 开始提交大量的任务", DateTime.Now.ToString("o"));
for (int i = 0; i < maxiQueueDepth * 100000; i++)
{
// 把数据Post到intPort内
intPort.Post(i);
}
Console.WriteLine("[{0}] 大量任务提交完毕。", DateTime.Now.ToString("o"));
Console.WriteLine("Press any key to exit");
Console.ReadKey();
dispatcher.Dispose();
}
}
}
二、原理讲解
其实在代码里面的注释,我想应该写的很明白了,不过毕竟是第一个例子,我还是稍微讲些CCR的大致原理(至于详细的实现原理,我后面会专门起文想说,这次主要的目的还是,先领略下CCR的神奇之处):
首先,多线程是以操作系统线程池的形式被Dispatcher管理的,因此创建线程的工作实际上由Dispatcher代劳了;
其次,在CCR内,他规定Dispatcher只能处理任务,而这个任务只能从任务队列DispatcherQueue内获取,因此,我们要创建任务队列,并关联上Dispatcher;
然后,我们把自己要提交给处理函数处理的数据,封装在Port<T>内,Port其实是一个FIFO队列,专门用来接收用户提交的数据的;
最后,我们要把数据、处理函数、任务队列 组合起来,这就是上面代码中的step4,这步其实做了2个工作:
1、把port和处理函数,封装为Receive关联起来;
2、把Receive和DispatcherQueue关联起来;
这样,我们就完成了,所有的工作。
总之,CCR提供了一个模式,让我们只需要把需要并发、异步处理的工作,分解为:
1、输入数据--->post到Port内;
2、处理过程--->做成委托关联到任务队列中
这种方法,是的编写多线程程序的工作大大简化了,而且也能够让后台的代码能够被编译器统一优化。
三、附录
1、实例代码下载
2、本系列前文:
我的CCR之旅(1):来自微软机器人技术的新并发、异步解决方案