【问题标题】:A parallel for using std::thread?并行使用 std::thread?
【发布时间】:2012-12-26 18:22:11
【问题描述】:

我是 std::thread 的新手,我尝试编写 parallel_for。 我编写了以下代码:

// parallel_for.cpp
// compilation: g++ -O3 -std=c++0x parallel_for.cpp -o parallel_for -lpthread
// execution: time ./parallel_for 100 50000000 
// (100: number of threads, 50000000: vector size)
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <vector>
#include <thread>
#include <cmath>
#include <algorithm>
#include <numeric>
#include <utility>

// Parallel for
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
    const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
    std::vector<std::thread> threads;
    for (Iterator it = first; it < last; it += group) {
        threads.push_back(std::thread([=](){std::for_each(it, std::min(it+group, last), f);}));
    }
    std::for_each(threads.begin(), threads.end(), [=](std::thread& x){x.join();});
}

// Function to apply
template<typename Type>
void f1(Type& x)
{
    x = std::sin(x)+std::exp(std::cos(x))/std::exp(std::sin(x)); 
}

// Main
int main(int argc, char* argv[]) {

    const unsigned int nthreads = (argc > 1) ? std::atol(argv[1]) : (1);
    const unsigned int n = (argc > 2) ? std::atol(argv[2]) : (100000000);
    double x = 0;
    std::vector<double> v(n);
    std::iota(v.begin(), v.end(), 0);

    parallel_for(v.begin(), v.end(), f1<double>, nthreads);

    for (unsigned int i = 0; i < n; ++i) x += v[i];
    std::cout<<std::setprecision(15)<<x<<std::endl;
    return 0;
}

但这不起作用:(来自 g++ 4.6 的错误代码)

parallel_for.cpp: In instantiation of ‘parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]::<lambda()>’:
parallel_for.cpp:22:9:   instantiated from ‘void parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]’
parallel_for.cpp:43:58:   instantiated from here
parallel_for.cpp:22:89: erreur: field ‘parallel_for(const Iterator&, const Iterator&, Function&&, int, int) [with Iterator = __gnu_cxx::__normal_iterator<double*, std::vector<double> >, Function = void (&)(double&)]::<lambda()>::__f’ invalidly declared function type

如何解决这个问题?

编辑:这个新版本编译但没有给出好的结果:

// parallel_for.cpp
// compilation: g++ -O3 -std=c++0x parallel_for.cpp -o parallel_for -lpthread
// execution: time ./parallel_for 100 50000000 
// (100: number of threads, 50000000: vector size)
#include <iostream>
#include <iomanip>
#include <cstdlib>
#include <vector>
#include <thread>
#include <cmath>
#include <algorithm>
#include <numeric>
#include <utility>

// Parallel for
template<typename Iterator, class Function>
void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
{
    const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
    std::vector<std::thread> threads;
    for (Iterator it = first; it < last; it += group) {
        threads.push_back(std::thread([=, &f](){std::for_each(it, std::min(it+group, last), f);}));
    }
    std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
}

// Function to apply
template<typename Type>
void f(Type& x)
{
    x = std::sin(x)+std::exp(std::cos(x))/std::exp(std::sin(x)); 
}

// Main
int main(int argc, char* argv[]) {

    const unsigned int nthreads = (argc > 1) ? std::atol(argv[1]) : (1);
    const unsigned int n = (argc > 2) ? std::atol(argv[2]) : (100000000);
    double x = 0;
    double y = 0;
    std::vector<double> v(n);

    std::iota(v.begin(), v.end(), 0);
    std::for_each(v.begin(), v.end(), f<double>);
    for (unsigned int i = 0; i < n; ++i) x += v[i];

    std::iota(v.begin(), v.end(), 0);
    parallel_for(v.begin(), v.end(), f<double>, nthreads);
    for (unsigned int i = 0; i < n; ++i) y += v[i];

    std::cout<<std::setprecision(15)<<x<<" "<<y<<std::endl;
    return 0;
}

结果是:

./parallel_for 1 100
155.524339894552 4950

并行版本返回 4950,而顺序版本返回 155..... 问题出在哪里?

【问题讨论】:

    标签: c++ c++11 parallel-processing std-function stdthread


    【解决方案1】:

    您需要在 (last-first) 进行强制转换或类型转换。原因是在模板参数推导期间从不进行类型转换。

    这很好用(也解决了 DeadMG 和 Ben Voigt 发现的问题)。 两个版本都给出 156608294.151782,n=100000000。

    template<typename Iterator, class Function>
    void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
    {
        const unsigned int group = std::max(std::max(ptrdiff_t(1), ptrdiff_t(std::abs(threshold))), ((last-first))/std::abs(nthreads));
        std::vector<std::thread> threads;
        threads.reserve(nthreads);
        Iterator it = first;
        for (; it < last-group; it += group) {
            threads.push_back(std::thread([=,&f](){std::for_each(it, std::min(it+group, last), f);}));
        }
        std::for_each(it, last, f); // last steps while we wait for other threads
    
        std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
    }
    

    由于步长for_each(it, last, f)比其他步小,我们不妨在等待其他结果的同时使用调用线程完成。

    【讨论】:

      【解决方案2】:
      • 您必须通过引用来捕获函数。

        [=, &amp;f] () { /* your code */ };

      • 查看代码。

        #include <iostream>
        
        template <class T>
        void foo(const T& t)
        {
            const int a = t;
            [&]
            {
                std::cout << a << std::endl;
            }();
        }
        
        
        int main()
        {
            foo(42);
            return 0;
        }
        

        clang 给出输出 42,但 g++ 抛出警告:‘a’ is used uninitialized in this function,并打印 0。看起来像一个错误。

        解决方法:使用const auto(用于代码中的变量group)。

        UPD:我想,就是这样。 http://gcc.gnu.org/bugzilla/show_bug.cgi?id=52026

      【讨论】:

      • 谢谢!但是现在有一个新问题。
      【解决方案3】:

      一个问题是it += group 可以合法地成为last,但在最后创建一个值是未定义的行为。仅仅检查it &lt; last 为时已晚,无法解决这个问题。

      您需要在 it 仍然有效时测试 last - it。 (it + grouplast - group 都不一定是安全的,尽管后者应该是由于 group 的计算方式造成的。)

      例如:

      template<typename Iterator, class Function>
      void parallel_for(const Iterator& first, const Iterator& last, Function f, const int nthreads = 1, const int threshold = 100)
      {
          const unsigned int group = std::max(std::max(1, std::abs(threshold)), (last-first)/std::abs(nthreads));
          std::vector<std::thread> threads;
          threads.reserve(nthreads);
          Iterator it = first;
          for (; last - it > group; it += group) {
              threads.push_back(std::thread([=, &f](){std::for_each(it, it+group, last), f);}));
          }
          threads.push_back(std::thread([=, &f](){std::for_each(it, last, f);}));
      
          std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
      }
      

      【讨论】:

      • 是的。 OP交叉发布了这个,所以在另一个方面已经有了答案。小事:it 不在第二个 for_each 的使用范围内。
      【解决方案4】:

      您将std::min(it+group, last) 赋予std::for_each,但始终在末尾添加group。这意味着如果last 不是it 上的group 的倍数,您将把it 移到last 后面,即UB。

      【讨论】:

        【解决方案5】:

        您需要通过引用进行捕获,并且需要在(后到先)进行强制转换或类型转换。 原因是在模板参数推导期间从不进行类型转换。

        另外,修复 DeadMG 发现的问题,最终得到以下代码。

        效果很好,两个版本都给出 156608294.151782,n=100000000。

        template<typename Iterator, class Function>
        void parallel_for(const Iterator& first, const Iterator& last, Function&& f, const int nthreads = 1, const int threshold = 1000)
        {
            const unsigned int group = std::max(std::max(ptrdiff_t(1), ptrdiff_t(std::abs(threshold))), ((last-first))/std::abs(nthreads));
            std::vector<std::thread> threads;
            Iterator it = first;
            for (; it < last-group; it += group) {
                threads.push_back(std::thread([=,&f](){std::for_each(it, std::min(it+group, last), f);}));
            }
            std::for_each(it, last, f); // use calling thread while we wait for the others
            std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
        }
        

        【讨论】:

        • 为什么第二个 lambda 需要通过引用捕获?它不引用任何局部变量。
        • 另外,演员阵容也完全无关紧要。如果有的话,您现在正在破坏差异大于 int 的迭代器。
        • @DeadMG,试试吧。你需要一个演员来通过模板参数推导,但我同意 int 可能太小了。
        • 说真的,只是……停下来。你没有改变任何有意义的东西,你也不知道问题是什么。去某个地方复制它,修复它,然后发布答案。
        • 我做到了...有了你的修复,这很好。
        【解决方案6】:

        vc11 解决方案,如果它不适用于 gcc,请告诉我。

        template<typename Iterator, class Function>
        void parallel_for( const Iterator& first, const Iterator& last, Function&& f, const size_t nthreads = std::thread::hardware_concurrency(), const size_t threshold = 1 )
        {
            const size_t portion = std::max( threshold, (last-first) / nthreads );
            std::vector<std::thread> threads;
            for ( Iterator it = first; it < last; it += portion )
            {
                Iterator begin = it;
                Iterator end = it + portion;
                if ( end > last )
                    end = last;
        
                threads.push_back( std::thread( [=,&f]() {
                    for ( Iterator i = begin; i != end; ++i )
                        f(i);
                }));
            }
            std::for_each(threads.begin(), threads.end(), [](std::thread& x){x.join();});
        }
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2013-11-13
          • 2017-09-07
          • 2016-10-20
          • 1970-01-01
          相关资源
          最近更新 更多