【问题标题】:Windows API Thread Pool simple exampleWindows API 线程池简单示例
【发布时间】:2012-01-11 13:52:58
【问题描述】:

[编辑:感谢 MSalters 的回答和 Raymond Chen 对InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection 的回答,问题得到解决,下面的代码工作正常。这应该提供了一个有趣的简单示例,用于在 Windows 中使用线程池]

我找不到以下任务的简单示例。例如,我的程序需要将巨大的 std::vector 中的值加一,因此我想并行执行此操作。它需要在程序的整个生命周期中多次这样做。我知道如何在每次调用例程时使用 CreateThread 来做到这一点,但我无法通过 ThreadPool 摆脱 CreateThread。

这是我的工作:

class Thread {
public:
    Thread(){}
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
   IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };

   virtual void run() {
        for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
          myvec[i]++; //and let's assume myvec is properly sized
    }
   int id, nb;
   std::vector<int> &myvec;
};

class ThreadGroup : public std::vector<Thread*> {
public:
    ThreadGroup() { 
         pool = CreateThreadpool(NULL);
         InitializeThreadpoolEnvironment(&cbe);
         cleanupGroup = CreateThreadpoolCleanupGroup();
         SetThreadpoolCallbackPool(&cbe, pool);
         SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
         threadCount = 0;
    }
    ~ThreadGroup() {
         CloseThreadpool(pool);
}
    PTP_POOL pool;
    TP_CALLBACK_ENVIRON cbe;
    PTP_CLEANUP_GROUP cleanupGroup;
    volatile long threadCount;
} ;


static VOID CALLBACK runFunc(
                PTP_CALLBACK_INSTANCE Instance,
                PVOID Context,
                PTP_WORK Work) {

   ThreadGroup &thread = *((ThreadGroup*) Context);
   long id = InterlockedIncrement(&(thread.threadCount));
   DWORD tid = (id-1)%thread.size();
   thread[tid]->run();
}

void run_threads(ThreadGroup* thread_group) {
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
    thread_group->threadCount = 0;
    for (int i=0; i<thread_group->size(); i++) {
        SubmitThreadpoolWork(worker);
     }  
     WaitForThreadpoolWorkCallbacks(worker,FALSE);  
     CloseThreadpoolWork(worker);   
}       

void main() {

   ThreadGroup group;
   std::vector<int> vec(10000, 0);
   for (int i=0; i<10; i++)
      group.push_back(new IncrementVectorThread(i, 10, vec));

   run_threads(&group);
   run_threads(&group);
   run_threads(&group);

   // now, vec should be == std::vector<int>(10000, 3);       
}

所以,如果我理解得很好:
- CreateThreadpool 命令创建了一堆线程(因此,对 CreateThreadpoolWork 的调用很便宜,因为它不调用 CreateThread)
- 我可以拥有任意数量的线程池(如果我想为“IncrementVector”创建一个线程池,为“DecrementVector”线程创建一个线程池,我可以)。
- 如果我需要将我的“增量向量”任务分成 10 个线程,而不是调用 10 次 CreateThread,我创建一个“worker”,并使用相同的参数将其提交 10 次到 ThreadPool(因此,我需要线程回调中的 ID 以了解我的 std::vector 的哪一部分要增加)。在这里我找不到线程 ID,因为函数 GetCurrentThreadId() 返回线程的真实 ID(即,类似于 1528,而不是介于 0..nb_launched_threads 之间的东西)。

最后,我不确定我是否理解这个概念:如果我将 std::vector 拆分为 10 个线程,我真的需要一个工作人员而不是 10 个工作人员吗?

谢谢!

【问题讨论】:

    标签: c++ windows multithreading threadpool


    【解决方案1】:

    您大概已经到了最后一点。

    关于线程池的整个想法是你不关心它有多少线程。您只需将大量工作投入线程池,并让操作系统确定如何执行每个块。 因此,如果您创建并提交 10 个块,操作系统可能会使用池中的 1 到 10 个线程。

    您不应该关心那些线程标识。不要为线程 ID、最小或最大线程数或类似的东西而烦恼。

    如果您不关心线程标识,那么您如何管理要更改的向量的哪一部分?简单的。在创建线程池之前,将一个计数器初始化为零。在回调函数中,调用InterlockedIncrement 检索并增加计数器。对于每个提交的工作项,您将获得一个连续的整数。

    【讨论】:

    • 谢谢 - 赞成!我按照您的建议做了,但是使用上面的简单代码,最后我仍然没有在向量中得到正确的值:s
    • 答案重新验证:计数器未在正确的位置增加 :) 谢谢!
    • 抱歉,我取消验证以便重新打开问题。对于 10000 次执行中的大约 7 次,上面的代码未能给出仅包含“3”的向量:s
    • 这不太可能与线程 pool 相关,但更多是因为这些线程之间缺乏同步。
    • 您错误地使用了InterlockedIncrement。有关解释,请参阅另一个问题。
    猜你喜欢
    • 1970-01-01
    • 2014-01-17
    • 2011-03-27
    • 1970-01-01
    • 2012-10-24
    • 2011-05-09
    • 1970-01-01
    • 2011-12-05
    相关资源
    最近更新 更多