【问题标题】:Parallelizing search within vector c++在向量 C++ 中并行化搜索
【发布时间】:2016-11-20 05:52:09
【问题描述】:

我有一个可变大小的大向量。我想检查每个元素(在向量的特定索引范围 lowerRange-upperRange 内)是否满足特定条件?在下面的示例中,我的输入向量包含 9 个元素,我想检查从 2 到 6 的元素是否满足 check_if_condition()。这里,lowerRange=2 和 upperRange=6

为此,我编写了以下并行代码来执行相同的操作,但是,此代码的问题是它给出了错误:“glibc 检测 smallbin 链表已损坏”。我尝试使用 valgrind 调试代码,但我无法找到错误的确切原因。

我的实际输入向量包含 10000000 个元素,我想检查 999999(lowerRange)-9999999(upperRange) 之间的元素(此范围由用户指定,尽管我已将此范围视为代码中的常量。 ) 索引元素满足check_if_condition

#include <thread>
#include <vector>
#include <iostream>
#include <atomic>

unsigned check_if_condition(int a)
{
    //Long check here
    return 1; 
}

void doWork(std::vector<unsigned>& input, std::vector<unsigned>& results, unsigned assigned, size_t current, size_t end, std::atomic_int& totalPassed)
{
    end = std::min(end, input.size()-2);
    int numPassed = 0;    
    for(; (current) < end; ++current) {
        if(check_if_condition(input[current])) {
            results[current] = true;
            ++numPassed;
        }
    }

    totalPassed.fetch_add(numPassed);
}

int main()
{
    std::vector<unsigned> input;//(1000000);  
    input.push_back(0); input.push_back(1); input.push_back(2); input.push_back(3); input.push_back(4); input.push_back(5); input.push_back(6); input.push_back(7); input.push_back(8);
    std::vector<unsigned> results(input.size());
    std::atomic_int numPassed(0);        
    auto numThreads = std::thread::hardware_concurrency();    
    std::vector<std::thread> threads;
    unsigned assigned;

    if(numThreads> input.size())
        numThreads=input.size();
    std::cout<<"numThreads="<<numThreads<<"\n";
    auto blockSize = input.size() / numThreads;
    for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition
        threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed));


    for(auto& thread : threads)
        thread.join();


    std::vector<int> storage;
    storage.reserve(numPassed.load());

    auto itRes = results.begin();
    auto itInput = input.begin();
    auto endRes = results.end();
    for(; itRes != endRes; ++itRes, ++itInput) {
        if(*itRes)
            storage.emplace_back(*itInput);            
    }

    std::cout<<"\n Storage:";
    for(std::vector<int>::iterator i1=storage.begin(), l1=storage.end(); i1!=l1; ++i1)
        std::cout<<" "<<(*i1)<<"\n";

    std::cout << "Done" << std::endl;
}

【问题讨论】:

  • 您需要确保在向量子范围上进行分叉工作的工作不超过对向量子范围本身的工作。使用 10 个线程和 1000 万个元素,每个线程将拥有 100 万个元素;如果工作是每个元素数十条机器指令,那么每个线程将有 ~~1 亿条指令要做,这应该足够了。如果您只有 100,000 个元素,那么您可能还不够,并且分发工作将会放缓。在现实世界的应用程序中真的有 1000 万个元素吗?

标签: c++ c++11 vector concurrency parallel-processing


【解决方案1】:

您正在检查 end 而不是 current 在您的 doWork 中,因此您在上次迭代中读取过去的向量

for(size_t i = 0; i < numThreads - 1; ++i) //check whether elements from 2 to 6 satisfy check_if_condition
        threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned,((i+2) * blockSize), ((i+3) * blockSize), std::ref(numPassed));

假设你的向量有 1000 个元素,你的线程数是 8,在最后一次迭代中你会得到:

i = 7;

电流 = (7+2)*125 = 1125;

结束 = (7+3)*125 = 1250;


因此,要在给定子范围 [rangeStart, rangeEnd) 的线程之间均匀分配工作,您需要执行以下循环:

for(size_t i = 0; i < numThreads; ++i) 
{
    auto start = rangeStart + i * blockSize;
    auto end = (i == numThreads - 1) ? rangeEnd : start + (i+1) * blockSize;
    threads.emplace_back(doWork, std::ref(input), std::ref(results), assigned, start, end, std::ref(numPassed));
}

请注意,在最后一次迭代中,end 直接设置为 rangeEnd,因此最后一个线程可能需要做更多的工作

另外,应该调整块大小:

auto blockSize = (rangeEnd - rangeStart) / numThreads;

【讨论】:

  • 首先你的块大小应该是 (rangeEnd - rangeStart) / numOf Threads
  • 你当前的第二个 = range + i * blockSizeend = start + (i+1) * blockSize
  • 这取决于 start 和/或 end 是否包含在内,我认为您可以根据自己的具体需求进行调整
  • 抱歉,我在您的原始代码中发现了问题,并希望您从那里得到它。我希望你不要指望我为你完成你的任务,这不是 SO 的目的
  • 你有另一个错误,你的存储向量小于你的结果向量,但你假设它们的大小相同,所以你现在也跳过存储向量
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-16
  • 2016-05-08
  • 1970-01-01
  • 2014-07-19
相关资源
最近更新 更多