【发布时间】:2012-01-11 13:52:58
【问题描述】:
[编辑:感谢 MSalters 的回答和 Raymond Chen 对InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection 的回答,问题得到解决,下面的代码工作正常。这应该提供了一个有趣的简单示例,用于在 Windows 中使用线程池]
我找不到以下任务的简单示例。例如,我的程序需要将巨大的 std::vector 中的值加一,因此我想并行执行此操作。它需要在程序的整个生命周期中多次这样做。我知道如何在每次调用例程时使用 CreateThread 来做到这一点,但我无法通过 ThreadPool 摆脱 CreateThread。
这是我的工作:
class Thread {
public:
Thread(){}
virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };
virtual void run() {
for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
myvec[i]++; //and let's assume myvec is properly sized
}
int id, nb;
std::vector<int> &myvec;
};
class ThreadGroup : public std::vector<Thread*> {
public:
ThreadGroup() {
pool = CreateThreadpool(NULL);
InitializeThreadpoolEnvironment(&cbe);
cleanupGroup = CreateThreadpoolCleanupGroup();
SetThreadpoolCallbackPool(&cbe, pool);
SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
threadCount = 0;
}
~ThreadGroup() {
CloseThreadpool(pool);
}
PTP_POOL pool;
TP_CALLBACK_ENVIRON cbe;
PTP_CLEANUP_GROUP cleanupGroup;
volatile long threadCount;
} ;
static VOID CALLBACK runFunc(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WORK Work) {
ThreadGroup &thread = *((ThreadGroup*) Context);
long id = InterlockedIncrement(&(thread.threadCount));
DWORD tid = (id-1)%thread.size();
thread[tid]->run();
}
void run_threads(ThreadGroup* thread_group) {
SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());
TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
thread_group->threadCount = 0;
for (int i=0; i<thread_group->size(); i++) {
SubmitThreadpoolWork(worker);
}
WaitForThreadpoolWorkCallbacks(worker,FALSE);
CloseThreadpoolWork(worker);
}
void main() {
ThreadGroup group;
std::vector<int> vec(10000, 0);
for (int i=0; i<10; i++)
group.push_back(new IncrementVectorThread(i, 10, vec));
run_threads(&group);
run_threads(&group);
run_threads(&group);
// now, vec should be == std::vector<int>(10000, 3);
}
所以,如果我理解得很好:
- CreateThreadpool 命令创建了一堆线程(因此,对 CreateThreadpoolWork 的调用很便宜,因为它不调用 CreateThread)
- 我可以拥有任意数量的线程池(如果我想为“IncrementVector”创建一个线程池,为“DecrementVector”线程创建一个线程池,我可以)。
- 如果我需要将我的“增量向量”任务分成 10 个线程,而不是调用 10 次 CreateThread,我创建一个“worker”,并使用相同的参数将其提交 10 次到 ThreadPool(因此,我需要线程回调中的 ID 以了解我的 std::vector 的哪一部分要增加)。在这里我找不到线程 ID,因为函数 GetCurrentThreadId() 返回线程的真实 ID(即,类似于 1528,而不是介于 0..nb_launched_threads 之间的东西)。
最后,我不确定我是否理解这个概念:如果我将 std::vector 拆分为 10 个线程,我真的需要一个工作人员而不是 10 个工作人员吗?
谢谢!
【问题讨论】:
标签: c++ windows multithreading threadpool