【发布时间】:2015-10-15 21:25:35
【问题描述】:
我正在使用std::futures 来并行处理我的算法。我将信息分成互斥的池,然后在每个池的自己的线程中执行相同的操作。代码如下所示:
class Processor
{
public:
Processor(const std::string &strVal) : m_strVal(strVal)
{
}
std::string GetVal() const {return m_strVal;}
std::vector<std::string> Do()
{
// do some processing - this can throw an exception
}
private:
std::string m_strVal;
};
class ParallelAlgo
{
private:
std::vector<std::string> m_vecMasterResults;
public:
ProcessingFunction(const std::vector<std::string> &vecInfo)
{
// vecInfo holds mutually exclusive pools
std::vector<std::future<std::vector<std::string> > > vecFutures(vecInfo.size());
try
{
for (auto n = 0 ; n < vecInfo.size() ; n++)
{
vecFuture[n] = std::async(std::launch::async, &ParallelAlgo::WorkFunc, vecInfo[n].GetVal());
}
for (auto it = vecFutures.begin() ; it != vecFutures.end() ; ++it)
{
std::vector<std::string> RetVal = it->get();
m_MasterResults.insert(m_MasterResults.begin(), RetVal.begin(), RetVal.end());
vecFutures.erase(it);
}
}
catch (exception &e)
{
for (auto it = vecFutures.begin() ; it != vecFuture.end() ; ++it)
{
// race condition?
if (it->valid())
{
it->wait_for(std::chrono::second(0));
}
}
}
}
std::vector<std::string> ParallelAlgo::WorkFunc(const std::string &strVal)
{
Processor _Proccessor(strVal);
return _Processor.Do();
}
};
我的问题是Processor:Do()中抛出异常的情况如何处理?目前我使用future 捕获异常,然后为每个尚未完成的future 等待零秒;这很好 - 这些线程将简单地终止并且不会完成处理。但是,我是否没有在 catch 块中引入竞争条件。 future 可以在调用valid() 和wait_for() 之间完成,或者这不是问题,因为我没有在这些不完整的期货上调用get()?
【问题讨论】:
-
在完成的
thread上调用wait_for完全符合预期,它会立即返回。我没有看到问题。 -
既然你什么都不做,难道你就不能不担心未完成的线程吗?如有必要,
std::future的析构函数将阻塞直到异步处理完成,但据我所知,这就是你想要的。 -
示例代码中有一个错误。调用
vecFutures.erase(it);将使it无效,并在for 循环中进一步使用它是UB。 -
我刚刚问了一个类似的问题:stackoverflow.com/questions/32246665/…。就我而言,我希望在发生错误时停止所有仍在执行的异步任务。从您的问题中不清楚您是否也希望这样做,或者您是否愿意等待剩余的任务完成。
标签: c++ multithreading