【问题标题】:Dividing work between fixed number of threads with pthread使用 pthread 在固定数量的线程之间分配工作
【发布时间】:2017-02-18 10:33:35
【问题描述】:

我有n 个作业,它们之间没有共享资源,还有mthreads。我想有效地划分线程中的作业数量,以便在处理完所有内容之前没有空闲线程?

这是我的程序的原型:

class Job {
    //constructor and other stuff
    //...

    public: doWork();
};

struct JobParams{
  int threadId;
  Job job;
};

void* doWorksOnThread(void* job) {
    JobParams* j = // cast argument
    cout << "Thread #" << j->threadId << " started" << endl;
    j->job->doWork();
    return (void*)0;
}

然后在我的主文件中,我有类似的内容:

int main() {
  vector<Job> jobs; // lets say it has 17 jobs
  int numThreads = 4;

  pthread_t* threads = new pthread_t[numThreads];
  JobParams* jps = new JubParams[jobs.size()];

  for(int i = 0; i < jobs.size(); i++) {
    jps[i]->job = jobs[i];        
  }

  for(int i = 0; i < numThread; i++) {
    pthread_create(&t[i], null, doWorkOnThread, &jps[0])
  }

  //another for loop and call join on 4 threads...

  return 0;
}

如何有效地确保在所有作业完成之前没有空闲线程?

【问题讨论】:

  • 你为什么使用 pthreads 而不是std::thread?为什么要使用new 创建数组而不是使用std::vector?你让你的代码比必要的复杂。

标签: c++ multithreading pthreads


【解决方案1】:

您需要添加一个循环来识别已完成的线程,然后启动新的线程,确保始终有多达 4 个线程在运行。

这是一个非常基本的方法。按照建议使用睡眠可能是一个好的开始,并且可以完成这项工作(即使在您确定最后一个线程完成之前添加额外的延迟)。理想情况下,您应该在作业完成时使用线程通知的condition variable 来唤醒主循环(然后睡眠指令将被等待条件指令替换)。

struct JobParams{
  int threadId;
  Job job;
  std::atomic<bool> done; // flag to know when job is done, could also be an attribute of Job class!
};

void* doWorksOnThread(void* job) {
    JobParams* j = // cast argument
    cout << "Thread #" << j->threadId << " started" << endl;
    j->job->doWork();
    j->done = true; // signal job completed
    return (void*)0;
}

int main() {
    ....

    std::map<JobParams*,pthread_t*> runningThreads; // to keep track of running jobs

    for(int i = 0; i < jobs.size(); i++) {
        jps[i]->job = jobs[i];   
        jps[i]->done = false; // mark as not done yet
    }

    while ( true )
    {
        vector<JobParams*> todo;
        for( int i = 0; i < jobs.size(); i++ )
        {
            if ( !jps[i]->done )
            {
                if ( runningThreads.find(jps[i]) == runningThreads.end() )
                    todo.push_back( &jps[i] ); // job not started yet, mask as to be done
                // else, a thread is already processing the job and did not complete it yet
            }
            else
            {
                 if ( runningThreads.find(jps[i]) != runningThreads.end() )
                 {
                     // thread just completed the job!
                     // let's join to wait for the thread to end cleanly
                     // I'm not familiar with pthread, hope this is correct
                     void* res;
                     pthread_join(runningThreads[jps[i]], &res);  
                     runningThreads.erase(jps[i]); // not running anymore
                 }
                 // else, job was already done and thread joined from a previous iteration
            }
        }

        if ( todo.empty() && runningThreads.empty() )
            break; // done all jobs

        // some jobs remain undone

        if ( runningThreads.size() < numThreads && !todo.empty() )
        {
            // some new threads shall be started...

            int newThreadsToBeCreatedCount = numThreads - runningThreads.size();
            // make sure you don't end up with too many threads running
            if ( todo.size() > newThreadsToBeCreatedCount )
                todo.resize( newThreadsToBeCreatedCount ); 

            for ( auto jobParam : todo )
            {
                pthread_t* thread = runningThreads[&jobParam];
                pthread_create(thread, null, doWorkOnThread, &jobParam );
            }
        }
        // else: you already have 4 runnign jobs

        // sanity check that everything went as expected:
        assert( runningThreads.size() <= numThreads );

        msleep( 100 ); // give a chance for some jobs to complete (100ms)
                       // adjust sleep duration if necessary
    }
}

注意:我对 pthread 不是很熟悉。希望语法正确。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-11
    • 1970-01-01
    • 2021-11-09
    相关资源
    最近更新 更多