【问题标题】:C++ futures parallel processingC++ 期货并行处理
【发布时间】:2015-10-15 21:25:35
【问题描述】:

我正在使用std::futures 来并行处理我的算法。我将信息分成互斥的池,然后在每个池的自己的线程中执行相同的操作。代码如下所示:

class Processor
{
public:
    Processor(const std::string &strVal) : m_strVal(strVal)
    {
    }

    std::string GetVal() const {return m_strVal;}

    std::vector<std::string> Do()
    {
        // do some processing - this can throw an exception
    }

private:
    std::string m_strVal;
};

class ParallelAlgo
{
private:
    std::vector<std::string> m_vecMasterResults;

public:

    ProcessingFunction(const std::vector<std::string> &vecInfo)
    {
        // vecInfo holds mutually exclusive pools

        std::vector<std::future<std::vector<std::string> > > vecFutures(vecInfo.size());

        try
        {
            for (auto n = 0 ; n < vecInfo.size() ; n++)
            {
                vecFuture[n] = std::async(std::launch::async, &ParallelAlgo::WorkFunc, vecInfo[n].GetVal());
            }

            for (auto it = vecFutures.begin() ; it != vecFutures.end() ; ++it)
            {
                std::vector<std::string> RetVal = it->get();
                m_MasterResults.insert(m_MasterResults.begin(), RetVal.begin(), RetVal.end());
                vecFutures.erase(it);
            }
        }
        catch (exception &e)
        {
            for (auto it = vecFutures.begin() ; it != vecFuture.end() ; ++it)
            {
                // race condition?
                if (it->valid())
                {
                    it->wait_for(std::chrono::second(0));
                }
            }
        }
    }

    std::vector<std::string> ParallelAlgo::WorkFunc(const std::string &strVal)
    {
        Processor _Proccessor(strVal);
        return _Processor.Do();
    }
};

我的问题是Processor:Do()中抛出异常的情况如何处理?目前我使用future 捕获异常,然后为每个尚未完成的future 等待零秒;这很好 - 这些线程将简单地终止并且不会完成处理。但是,我是否没有在 catch 块中引入竞争条件。 future 可以在调用valid()wait_for() 之间完成,或者这不是问题,因为我没有在这些不完整的期货上调用get()

【问题讨论】:

  • 在完成的thread 上调用wait_for 完全符合预期,它会立即返回。我没有看到问题。
  • 既然你什么都不做,难道你就不能不担心未完成的线程吗?如有必要,std::future 的析构函数将阻塞直到异步处理完成,但据我所知,这就是你想要的。
  • 示例代码中有一个错误。调用vecFutures.erase(it); 将使it 无效,并在for 循环中进一步使用它是UB。
  • 我刚刚问了一个类似的问题:stackoverflow.com/questions/32246665/…。就我而言,我希望在发生错误时停止所有仍在执行的异步任务。从您的问题中不清楚您是否也希望这样做,或者您是否愿意等待剩余的任务完成。

标签: c++ multithreading


【解决方案1】:

valid 的调用仅检查是否存在相应的共享状态,对于已完成的线程,这仍然是正确的,直到您对其调用get

那里没有竞争条件。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-07-15
    • 2020-01-30
    • 2017-09-30
    • 2015-08-25
    • 2022-10-23
    相关资源
    最近更新 更多