【发布时间】:2018-11-26 16:17:53
【问题描述】:
std::async 的 windows 实现似乎存在错误。在重负载下(大约每秒启动 1000 个异步线程),异步任务永远不会被调度,等待返回的期货会导致死锁。请参阅这段代码(使用延迟而不是异步的启动策略进行修改):
BundlingChunk(size_t numberOfInputs, Bundler* parent, ChunkIdType chunkId)
: m_numberOfInputs(numberOfInputs), m_parent(parent), m_chunkId(chunkId)
{
const BundlerChunkDescription& chunk = m_parent->m_chunks[m_chunkId];
const ChunkInfo& original = chunk.m_original;
auto& deserializers = m_parent->m_deserializers;
// Fetch all chunks in parallel.
std::vector<std::map<ChunkIdType, std::shared_future<ChunkPtr>>> chunks;
chunks.resize(chunk.m_secondaryChunks.size());
static std::atomic<unsigned long long int> chunksInProgress = 0;
for (size_t i = 0; i < chunk.m_secondaryChunks.size(); ++i)
{
for (const auto& c : chunk.m_secondaryChunks[i])
{
const auto chunkCreationLambda = ([this, c, i] {
chunksInProgress++;
ChunkPtr chunk = m_parent->m_weakChunkTable[i][c].lock();
if (chunk) {
chunksInProgress--;
return chunk;
}
chunksInProgress--;
return m_parent->m_deserializers[i]->GetChunk(c);
});
std::future<ChunkPtr> chunkCreateFuture = std::async(std::launch::deferred, chunkCreationLambda);
chunks[i].emplace(c, chunkCreateFuture.share());
}
}
std::vector<SequenceInfo> sequences;
sequences.reserve(original.m_numberOfSequences);
// Creating chunk mapping.
m_parent->m_primaryDeserializer->SequenceInfosForChunk(original.m_id, sequences);
ChunkPtr drivingChunk = chunks.front().find(original.m_id)->second.get();
m_sequenceToSequence.resize(deserializers.size() * sequences.size());
m_innerChunks.resize(deserializers.size() * sequences.size());
for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
{
if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
{
continue;
}
size_t currentIndex = sequenceIndex * deserializers.size();
m_sequenceToSequence[currentIndex] = sequences[sequenceIndex].m_indexInChunk;
m_innerChunks[currentIndex] = drivingChunk;
}
// Creating sequence mapping and requiring underlying chunks.
SequenceInfo s;
for (size_t deserializerIndex = 1; deserializerIndex < deserializers.size(); ++deserializerIndex)
{
auto& chunkTable = m_parent->m_weakChunkTable[deserializerIndex];
for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
{
if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
{
continue;
}
size_t currentIndex = sequenceIndex * deserializers.size() + deserializerIndex;
bool exists = deserializers[deserializerIndex]->GetSequenceInfo(sequences[sequenceIndex], s);
if (!exists)
{
if(m_parent->m_verbosity >= (int)TraceLevel::Warning)
fprintf(stderr, "Warning: sequence '%s' could not be found in the deserializer responsible for stream '%ls'\n",
m_parent->m_corpus->IdToKey(sequences[sequenceIndex].m_key.m_sequence).c_str(),
deserializers[deserializerIndex]->StreamInfos().front().m_name.c_str());
m_sequenceToSequence[currentIndex] = SIZE_MAX;
continue;
}
m_sequenceToSequence[currentIndex] = s.m_indexInChunk;
ChunkPtr secondaryChunk = chunkTable[s.m_chunkId].lock();
if (!secondaryChunk)
{
secondaryChunk = chunks[deserializerIndex].find(s.m_chunkId)->second.get();
chunkTable[s.m_chunkId] = secondaryChunk;
}
m_innerChunks[currentIndex] = secondaryChunk;
}
}
}
我上面的版本已修改,以便异步任务以延迟而不是异步方式启动,从而解决了问题。从 VS2017 可再发行版 14.12.25810 起,有没有其他人看到过类似的东西?重现此问题就像在具有 GPU 和 SSD 的机器上训练使用文本和图像阅读器的 CNTK 模型一样容易,因此 CPU 反序列化成为瓶颈。在大约 30 分钟的训练后,通常会出现死锁。有人在 Linux 上看到过类似的问题吗?如果是这样,它可能是代码中的错误,尽管我对此表示怀疑,因为调试计数器chunksInProgress 在死锁后始终为 0。作为参考,整个源文件位于https://github.com/Microsoft/CNTK/blob/455aef80eeff675c0f85c6e34a03cb73a4693bff/Source/Readers/ReaderLib/Bundler.cpp。
【问题讨论】:
-
改用我的图书馆。 MSVC 下的 std::async 使用了 PPL,它实现得很糟糕.. : github.com/David-Haim/concurrencpp
-
@DavidHaim 你是否暗示一些没有文档、没有测试、甚至没有任何类型的自述文件的随机标题会是顶级供应商库的更好替代品? extraordinary claims
-
@DavidHaim 不,它没有。反正现在不是,我怀疑它曾经做过,可惜我不能对评论投反对票。
-
@PaulSanders 是的,是的。一个future/promise dou 只是
concurrency::task的一个包装器。可惜你从来没有检查过。一直都是这个实现。 -
@DavidHaim 很抱歉,您似乎是对的,我刚刚浏览了大约 400 万个模板,而您确实进入了
ppltasks.h。但是,我看到它是建立在 Windows 原生 ThreadPool API 的 top 之上的,请参阅 here,那么是什么让您认为它如此柠檬?
标签: c++ concurrency stl cntk