【问题标题】:Opening multiple threads with an object and returning a result使用一个对象打开多个线程并返回结果
【发布时间】:2019-04-09 00:46:00
【问题描述】:

我试图通过一个循环打开多个线程,其中每个线程都是一个类的实例,它的构造函数重载,这样它会自动运行所需的代码,这个函数返回一个 unordered_list,我想为此检索它然后将特定实例附加到最终的 unordered_list

我尝试过使用future 和promise,但是当我尝试时我最终搞糊涂了。这个项目旨在挑战我并帮助我学习 C++ 中的多线程。

    //class to be instantiated per thread   
    class WordCounter {
    public:
        std::unordered_map<std::string, int> thisWordCount;
        std::string word;

        WordCounter(std::string filepath) {}//will be overloaded
        ~WordCounter() {}//destructor

        std::unordered_map<std::string, int>operator()(std::string filepath) const {}//overloaded constructor signature
        std::unordered_map<std::string, int>operator()(std::string currentFile) {//overloaded constructor implementation
            fstream myReadFile;
            myReadFile.open(currentFile);
            if (!!!myReadFile) {
                cout << "Unable to open file";
                exit(1); // terminate with error
            }
            else if (myReadFile.is_open()) {
                while (!myReadFile.eof()) {
                    while (myReadFile >> word) {
                        ++thisWordCount[word];
                    }
                }
            }
            myReadFile.close();

            return thisWordCount;
        }
    };


    int main(int argc, char** argv)
    {
        std::vector<std::thread> threads;//store instantiated threads using WordCounter
        static std::unordered_map<std::string, int> finalWordCount; //append result from each thread to this unordered_list only when a particular thread finish's reading a file
        vector<string> fileName = { "input1.txt" , "input2.txt" };//filepaths to the files used

        for (int i = 0; i < fileName.size(); ++i)//loop through vector of filepaths to open a thread for each file to then be processed by that thread
        {
            std::string currentFile = DIR + fileName[i];
            std::thread _newThread(new WordCount(currentFile); //is this how the thread would be created?
            threads.emplace_back(_newThread);//store new thread in a vector

//I want to read through the vector when a particular thread finishes and append that particular threads result to finalWordCount

        }

}

【问题讨论】:

    标签: multithreading oop c++11 std unordered-map


    【解决方案1】:

    多线程代码

    让我们从编写一个多线程countWords 函数开始。这将为我们提供代码需要做什么的高级概述,然后我们将填补缺失的部分。

    写作countWords

    countWords 在文件名向量中计算每个文件中的词频。它并行执行此操作。

    步骤概述:

    • 创建线程向量
    • 提供一个存放最终结果的地方(这是finalWordCount变量)
    • WordCounter 创建一个回调函数,以便在完成时调用
    • 使用WordCounter 对象为每个文件启动一个新线程。
    • 等待广告完成
    • 返回finalWordCount

    WordCounter 对象在线程启动时将文件名作为输入。

    缺少的部分:

    • 我们还需要写一个makeWordCounter函数

    实施:

    using std::unordered_map;
    using std::string; 
    using std::vector; 
    
    unordered_map<string, int> countWords(vector<string> const& filenames) {
        // Create vector of threads
        vector<std::thread> threads;
        threads.reserve(filenames.size());
    
        // We have to have a lock because maps aren't thread safe
        std::mutex map_lock;
    
        // The final result goes here
        unordered_map<std::string, int> totalWordCount; 
    
        // Define the callback function
        // This operation is basically free
        // Internally, it just copies a reference to the mutex and a reference
        // to the totalWordCount
        auto callback = [&](unordered_map<string, int> const& partial_count) {
            // Lock the mutex so only we have access to the map
            map_lock.lock(); 
            // Update the map
            for(auto count : partial_count) {
                totalWordCount[count.first] += count.second; 
            }
            // Unlock the mutex
            map_lock.unlock(); 
        };
    
        // Create a new thread for each file
        for(auto& file : filenames) {
            auto word_counter = makeWordCounter(callback); 
            threads.push_back(std::thread(word_counter, file)); 
        }
    
        // Wait until all threads have finished
        for(auto& thread : threads) {
            thread.join(); 
        }
    
        return totalWordCount; 
    }
    

    写作makeWordCounter

    我们的函数makeWordCounter 非常简单:它只是创建一个在回调中模板化的WordCounter 函数。

    template<class Callback>
    WordCounter<Callback> makeWordCounter(Callback const& func) {
        return WordCounter<Callback>{func}; 
    }
    

    编写WordCounter

    成员变量:

    • 回调函数(我们不需要其他任何东西)

    功能

    • operator() 使用文件名调用countWordsFromFilename
    • countWordsFromFilename 打开文件,确保一切正常,然后使用文件流调用 countWords
    • countWords 读取文件流中的所有单词并计算计数,然后使用最终计数调用回调。

    因为WordCounter真的很简单,所以我只是把它做成了一个结构体。它只需要存储Callback函数,并且通过将callback函数公开,我们不必编写构造函数(编译器使用聚合初始化自动处理它)。

    template<class Callback>
    struct WordCounter {
        Callback callback;
    
        void operator()(std::string filename) {
            countWordsFromFilename(filename); 
        }
        void countWordsFromFilename(std::string const& filename) {
            std::ifstream myFile(filename);
            if (myFile) {
                countWords(myFile); 
            }
            else {
                std::cerr << "Unable to open " + filename << '\n'; 
            }
        }
        void countWords(std::ifstream& filestream) {
            std::unordered_map<std::string, int> wordCount; 
            std::string word; 
            while (!filestream.eof() && !filestream.fail()) {
                filestream >> word; 
                wordCount[word] += 1;
            }
            callback(wordCount); 
        }
    };
    

    完整代码

    您可以在此处查看countWords 的完整代码:https://pastebin.com/WjFTkNYF

    我添加的唯一内容是#includes。

    回调和模板 101(应原始海报的要求)

    模板是编写代码时使用的一种简单而有用的工具。它们可以用来消除相互依赖;使算法通用(因此它们可以与您喜欢的任何类型一起使用);它们甚至可以让您避免调用虚拟成员函数或函数指针,从而使代码更快、更高效。

    模板类

    让我们看一个非常简单的表示一对的类模板:

    template<class First, class Second>
    struct pair {
        First first;
        Second second; 
    };
    

    在这里,我们将pair 声明为struct,因为我们希望所有成员都是公开的。

    请注意,没有First 类型,也没有Second 类型。 当我们使用名称FirstSecond 时,我们真正想说的是“在上下文中在pair 类中,名称First 将代表pair 类的First 参数,名称Second 将代表pair 类的第二个元素。

    我们可以很容易地把它写成:

    // This is completely valid too
    template<class A, class B>
    struct pair {
        A first;
        B second; 
    };
    

    使用pair 非常简单:

    int main() {
        // Create pair with an int and a string
        pair<int, std::string> myPair{14, "Hello, world!"}; 
    
        // Print out the first value, which is 14
        std::cout << "int value:    " << myPair.first << '\n';
        // Print out the second value, which is "Hello, world!"
        std::cout << "string value: " << myPair.second << '\n';
    }
    

    就像一个普通的类一样,pair 可以有成员函数、构造函数、析构函数……任何东西。因为pair就是这么简单的一个类,编译器会自动为我们生成构造函数和析构函数,我们不用操心。

    模板化函数

    模板函数看起来类似于常规函数。唯一的区别是它们在函数声明的其余部分之前有 template 声明。

    让我们编写一个简单的函数来打印一对:

    template<class A, class B>
    std::ostream& operator<<(std::ostream& stream, pair<A, B> pair) 
    {
        stream << '(' << pair.first << ", " << pair.second << ')'; 
        return stream; 
    }
    

    我们可以给它任何我们想要的pair,只要它知道如何打印这对的两个元素:

    int main() {
        // Create pair with an int and a string
        pair<int, std::string> myPair{14, "Hello, world!"}; 
    
        std::cout << myPair << '\n'; 
    }
    

    这会输出(14, Hello, world)

    回调

    C++ 中没有 Callback 类型。我们不需要一个。回调只是你用来表明某事发生的东西。

    让我们看一个简单的例子。这个函数寻找越来越大的数字,每次找到一个,它都会调用output,这是我们提供的一个参数。在这种情况下,output 是一个回调,我们用它来表示找到了一个新的最大数字。

    template<class Func>
    void getIncreasingNumbers(std::vector<double> const& nums, Func output) 
    {
        // Exit if there are no numbers
        if(nums.size() == 0) 
            return; 
    
        double biggest = nums[0]; 
        // We always output the first one
        output(biggest); 
        for(double num : nums) 
        {
            if(num > biggest) 
            {
                biggest = num; 
                output(num); 
            }
        }
    }
    

    我们可以通过很多不同的方式使用getIncreasingNumbers。例如,我们可以过滤不大于前一个的数字:

    std::vector<double> filterNonIncreasing(std::vector<double> const& nums) 
    {
        std::vector<double> newNums; 
        // Here, we use an & inside the square brackets
        // This is so we can use newNums by reference
        auto my_callback = [&](double val) { 
            newNums.push_back(val); 
        };
        getIncreasingNumbers(nums, my_callback); 
        return newNums; 
    }
    

    或者我们可以打印出来:

    void printNonIncreasing(std::vector<double> const& nums) 
    {
        // Here, we don't put anything in the square brackts
        // Since we don't access any local variables
        auto my_callback = [](double val) {
            std::cout << "New biggest number: " << val << '\n'; 
        };
        getIncreasingNums(nums, my_callback); 
    }
    

    或者我们可以找到它们之间最大的差距:

    double findBiggestJumpBetweenIncreasing(std::vector<double> const& nums)
    {
        double previous; 
        double biggest_gap = 0.0; 
        bool assigned_previous = false;
        auto my_callback = [&](double val) {
            if(not assigned_previous) {
                previous = val; 
                assigned_previous = true;
            }
            else 
            {
                double new_gap = val - previous; 
                if(biggest_gap < new_gap) {
                    biggest_gap = new_gap; 
                }
            }
        };
        getIncreasingNums(nums, my_callback); 
        return biggest_gap;
    }
    

    【讨论】:

    • 我没有提供不必要的额外代码,因此 int main 需要返回 0; currentFile 是 DIR(directory) + 我们正在使用的当前文件路径。所以我们会将 currentPath 传递给我认为的 WordCounter 构造函数。它在我的类中的重载构造函数实现中使用
    • WordCounter 中的 operator() 重载接受参数,并且在使用 WordCounter 启动 std::thread 时必须提供这些参数。此外,您不能复制线程;您要么必须使用 std::move 将其移动到向量中,要么在调用站点构造它(这是我在上面的代码中所做的)
    • 你想让我用一个简单的例子来更新我的答案吗?
    • 没有线程安全的方法可以将元素添加到地图中,因此您必须使用互斥锁。如果您已经知道要检查哪些单词,那么您可以事先将这些单词放入映射中,然后您可以使用原子操作(避免任何锁定),但是如果没有,您将无法将新单词添加到映射中互斥体。同步步骤(必须使用互斥锁的部分)通常是并行代码的瓶颈,但只要大部分工作可以在同步之前完成(而不是在同步期间),您仍然可以获得加速)。
    • 也就是说,上面代码的瓶颈是读取文件。您的硬盘驱动器每秒只能读取这么多数据,而且很可能即使是单线程代码也可以比从硬盘驱动器读取数据更快地处理数据。最好的办法是在一个线程上读取文件并在另一个线程上处理它。
    猜你喜欢
    • 2012-04-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-12
    • 1970-01-01
    • 2013-11-23
    • 1970-01-01
    • 2021-11-10
    相关资源
    最近更新 更多