【问题标题】:possible std::async implementation bug Windows可能的 std::async 实现错误 Windows
【发布时间】:2018-11-26 16:17:53
【问题描述】:

std::async 的 windows 实现似乎存在错误。在重负载下(大约每秒启动 1000 个异步线程),异步任务永远不会被调度,等待返回的期货会导致死锁。请参阅这段代码(使用延迟而不是异步的启动策略进行修改):

BundlingChunk(size_t numberOfInputs, Bundler* parent, ChunkIdType chunkId)
        : m_numberOfInputs(numberOfInputs), m_parent(parent), m_chunkId(chunkId)
    {
        const BundlerChunkDescription& chunk = m_parent->m_chunks[m_chunkId];
        const ChunkInfo& original = chunk.m_original;
        auto& deserializers = m_parent->m_deserializers;

        // Fetch all chunks in parallel.
        std::vector<std::map<ChunkIdType, std::shared_future<ChunkPtr>>> chunks;
        chunks.resize(chunk.m_secondaryChunks.size());
        static std::atomic<unsigned long long int> chunksInProgress = 0;

        for (size_t i = 0; i < chunk.m_secondaryChunks.size(); ++i)
        {
            for (const auto& c : chunk.m_secondaryChunks[i])
            {
                const auto chunkCreationLambda = ([this, c, i] {
                    chunksInProgress++;
                    ChunkPtr chunk = m_parent->m_weakChunkTable[i][c].lock();
                    if (chunk) {
                        chunksInProgress--;
                        return chunk;
                    }
                    chunksInProgress--;
                    return m_parent->m_deserializers[i]->GetChunk(c);
                });
                std::future<ChunkPtr> chunkCreateFuture = std::async(std::launch::deferred, chunkCreationLambda);
                chunks[i].emplace(c, chunkCreateFuture.share());
            }
        }

        std::vector<SequenceInfo> sequences;
        sequences.reserve(original.m_numberOfSequences);

        // Creating chunk mapping.
        m_parent->m_primaryDeserializer->SequenceInfosForChunk(original.m_id, sequences);
        ChunkPtr drivingChunk = chunks.front().find(original.m_id)->second.get();
        m_sequenceToSequence.resize(deserializers.size() * sequences.size());
        m_innerChunks.resize(deserializers.size() * sequences.size());
        for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
        {
            if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
            {
                continue;
            }

            size_t currentIndex = sequenceIndex * deserializers.size();
            m_sequenceToSequence[currentIndex] = sequences[sequenceIndex].m_indexInChunk;
            m_innerChunks[currentIndex] = drivingChunk;
        }

        // Creating sequence mapping and requiring underlying chunks.
        SequenceInfo s;
        for (size_t deserializerIndex = 1; deserializerIndex < deserializers.size(); ++deserializerIndex)
        {
            auto& chunkTable = m_parent->m_weakChunkTable[deserializerIndex];
            for (size_t sequenceIndex = 0; sequenceIndex < sequences.size(); ++sequenceIndex)
            {
                if (chunk.m_invalid.find(sequenceIndex) != chunk.m_invalid.end())
                {
                    continue;
                }

                size_t currentIndex = sequenceIndex * deserializers.size() + deserializerIndex;
                bool exists = deserializers[deserializerIndex]->GetSequenceInfo(sequences[sequenceIndex], s);
                if (!exists)
                {
                    if(m_parent->m_verbosity >= (int)TraceLevel::Warning)
                        fprintf(stderr, "Warning: sequence '%s' could not be found in the deserializer responsible for stream '%ls'\n",
                            m_parent->m_corpus->IdToKey(sequences[sequenceIndex].m_key.m_sequence).c_str(),
                            deserializers[deserializerIndex]->StreamInfos().front().m_name.c_str());
                    m_sequenceToSequence[currentIndex] = SIZE_MAX;
                    continue;
                }

                m_sequenceToSequence[currentIndex] = s.m_indexInChunk;
                ChunkPtr secondaryChunk = chunkTable[s.m_chunkId].lock();
                if (!secondaryChunk)
                {
                    secondaryChunk = chunks[deserializerIndex].find(s.m_chunkId)->second.get();
                    chunkTable[s.m_chunkId] = secondaryChunk;
                }

                m_innerChunks[currentIndex] = secondaryChunk;
            }
        }
    }

我上面的版本已修改,以便异步任务以延迟而不是异步方式启动,从而解决了问题。从 VS2017 可再发行版 14.12.25810 起,有没有其他人看到过类似的东西?重现此问题就像在具有 GPU 和 SSD 的机器上训练使用文本和图像阅读器的 CNTK 模型一样容易,因此 CPU 反序列化成为瓶颈。在大约 30 分钟的训练后,通常会出现死锁。有人在 Linux 上看到过类似的问题吗?如果是这样,它可能是代码中的错误,尽管我对此表示怀疑,因为调试计数器chunksInProgress 在死锁后始终为 0。作为参考,整个源文件位于https://github.com/Microsoft/CNTK/blob/455aef80eeff675c0f85c6e34a03cb73a4693bff/Source/Readers/ReaderLib/Bundler.cpp

【问题讨论】:

  • 改用我的图书馆。 MSVC 下的 std::async 使用了 PPL,它实现得很糟糕.. : github.com/David-Haim/concurrencpp
  • @DavidHaim 你是否暗示一些没有文档、没有测试、甚至没有任何类型的自述文件的随机标题会是顶级供应商库的更好替代品? extraordinary claims
  • @DavidHaim 不,它没有。反正现在不是,我怀疑它曾经做过,可惜我不能对评论投反对票。
  • @PaulSanders 是的,是的。一个future/promise dou 只是concurrency::task 的一个包装器。可惜你从来没有检查过。一直都是这个实现。
  • @DavidHaim 很抱歉,您似乎是对的,我刚刚浏览了大约 400 万个模板,而您确实进入了 ppltasks.h。但是,我看到它是建立在 Windows 原生 ThreadPool API 的 top 之上的,请参阅 here,那么是什么让您认为它如此柠檬?

标签: c++ concurrency stl cntk


【解决方案1】:

新的一天,更好的答案(很多更好)。继续阅读。

我花了一些时间调查std::async 在 Windows 上的行为,你是对的。这是另一种动物,请参阅here

因此,如果您的代码依赖于std::async 总是 启动一个新的执行线程并立即返回,那么您将无法使用它。无论如何,不​​在Windows上。在我的机器上,限制似乎是 768 个后台线程,这或多或少符合您观察到的情况。

无论如何,我想了解更多关于现代 C++ 的知识,所以我尝试自己替换 std::async,它可以在 Windows 上使用,并且语义被 OP 删除。因此,我谦虚地介绍以下内容:

AsyncTask:替换std::async

#include <future>
#include <thread>

template <class Func, class... Args>
    std::future <std::result_of_t <std::decay_t <Func> (std::decay_t <Args>...)>>
        AsyncTask (Func&& f, Args&&... args)
{
    using decay_func = std::decay_t <Func>;
    using return_type = std::result_of_t <decay_func (std::decay_t <Args>...)>;

    std::packaged_task <return_type (decay_func f, std::decay_t <Args>... args)>
        task ([] (decay_func f, std::decay_t <Args>... args)
    {
        return f (args...);
    });

    auto task_future = task.get_future ();
    std::thread t (std::move (task), f, std::forward <Args> (args)...);
    t.detach ();
    return task_future;
};

测试程序

#include <iostream>
#include <string>

int add_two_integers (int a, int b)
{
    return a + b;
}

std::string append_to_string (const std::string& s)
{
    return s + " addendum";
}

int main ()
{
    auto /* i.e. std::future <int> */ f1 = AsyncTask (add_two_integers , 1, 2);
    auto /* i.e. int */  i = f1.get ();
    std::cout << "add_two_integers : " << i << std::endl;

    auto  /* i.e. std::future <std::string> */ f2 = AsyncTask (append_to_string , "Hello world");
    auto /* i.e. std::string */ s = f2.get ();        std::cout << "append_to_string : " << s << std::endl;
    return 0;  
}

输出

add_two_integers : 3
append_to_string : Hello world addendum

现场演示 here (gcc) 和 here (clang)。

我从写这篇文章中学到了很多东西,而且很有趣。我对这些东西还很陌生,所以欢迎所有的cmets。如果我有任何问题,我会很乐意更新这篇文章。

【讨论】:

  • 我知道 std::async 的大多数实现都使用线程池并在这些线程之间安排异步任务,我认为这就是该代码的作者选择使用它的原因(超额订阅每秒提交这么多任务肯定会发生)。但是由于某种原因,当系统处于重负载下时,通过 std::async 提交给运行时的某些任务永远不会执行。可以通过使用延迟执行策略绕过线程池,这很有效,这就是为什么我怀疑该错误与 std::async 实现有关。
  • 嗯。您是否尝试将代码减少到 minimal reproducible example?如果您声称std:;async 已损坏的说法属实,那么用更简单的代码重现该问题应该不难,(a) 可以很容易地看出它本身没有固有问题,并且 (b) 可以由其他人测试看看他们是否可以自己重现该问题。这通常是这里的工作方式。
  • 请注意,这也不是一个符合标准的实现:std::async 不会返回一个普通的future,而是一个特殊的joins 线程在销毁时,即在销毁时阻塞.
【解决方案2】:

Paul Sander's answer的启发,我试着稍微简化他的代码:

#include <functional>
#include <future>
#include <thread>
#include <type_traits>

template <class Func, class... Args>
[[nodiscard]] std::future<std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>>
RunInThread(Func&& func, Args&&... args){
  using return_type = std::invoke_result_t<std::decay_t<Func>, std::decay_t<Args>...>;

  auto bound_func = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
  std::packaged_task<return_type(void)> task(bound_func);
  auto task_future = task.get_future();
  std::thread(std::move(task)).detach();
  return task_future;
}

【讨论】:

    猜你喜欢
    • 2019-03-15
    • 2021-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-10-23
    • 2013-03-17
    • 1970-01-01
    • 2017-04-09
    相关资源
    最近更新 更多