【问题标题】:How can I propagate exceptions between threads?如何在线程之间传播异常?
【发布时间】:2010-09-18 23:42:36
【问题描述】:

我们有一个单线程调用的函数(我们将其命名为主线程)。在函数体中,我们生成多个工作线程来执行 CPU 密集型工作,等待所有线程完成,然后在主线程上返回结果。

结果是调用者可以天真地使用该函数,并且在内部它将使用多个内核。

到目前为止一切顺利..

我们遇到的问题是处理异常。我们不希望工作线程上的异常导致应用程序崩溃。我们希望函数的调用者能够在主线程上捕获它们。我们必须在工作线程上捕获异常并将它们传播到主线程,让它们继续从那里展开。

我们该怎么做?

我能想到的最好的是:

  1. 在我们的工作线程上捕获各种各样的异常(std::exception 和我们自己的一些)。
  2. 记录异常的类型和消息。
  3. 在主线程上有一个相应的 switch 语句,它会重新抛出工作线程上记录的任何类型的异常。

这有一个明显的缺点,即只支持一组有限的异常类型,并且每当添加新的异常类型时都需要修改。

【问题讨论】:

    标签: c++ multithreading exception


    【解决方案1】:

    C++11 引入了exception_ptr 类型,允许在线程之间传输异常:

    #include<iostream>
    #include<thread>
    #include<exception>
    #include<stdexcept>
    
    static std::exception_ptr teptr = nullptr;
    
    void f()
    {
        try
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            throw std::runtime_error("To be passed between threads");
        }
        catch(...)
        {
            teptr = std::current_exception();
        }
    }
    
    int main(int argc, char **argv)
    {
        std::thread mythread(f);
        mythread.join();
    
        if (teptr) {
            try{
                std::rethrow_exception(teptr);
            }
            catch(const std::exception &ex)
            {
                std::cerr << "Thread exited with exception: " << ex.what() << "\n";
            }
        }
    
        return 0;
    }
    

    因为在您的情况下,您有多个工作线程,您需要为每个工作线程保留一个 exception_ptr

    请注意,exception_ptr 是一个类似 ptr 的共享指针,因此您需要至少保留一个 exception_ptr 指向每个异常,否则它们将被释放。

    Microsoft 特定:如果您使用 SEH 异常 (/EHa),示例代码还将传输 SEH 异常,例如访问冲突,这可能不是您想要的。

    【讨论】:

    • 主线程产生的多个线程怎么样?如果第一个线程遇到异常并退出,main() 将在可能永远运行的第二个线程 join() 处等待。 main() 永远不会在两个 join() 之后测试 teptr。似乎所有线程都需要定期检查全局 teptr 并在适当时退出。有没有一种干净的方法来处理这种情况?
    【解决方案2】:

    目前,唯一的可移植方法是为您可能希望在线程之间传输的所有类型的异常编写 catch 子句,将信息存储在该 catch 子句的某个位置,然后稍后使用它重新抛出异常。这是Boost.Exception采取的方法。

    在 C++0x 中,您将能够使用 catch(...) 捕获异常,然后使用 std::current_exception() 将其存储在 std::exception_ptr 的实例中。然后,您可以稍后使用 std::rethrow_exception() 从相同或不同的线程重新抛出它。

    如果您使用的是 Microsoft Visual Studio 2005 或更高版本,则just::thread C++0x thread library 支持std::exception_ptr。 (免责声明:这是我的产品)。

    【讨论】:

    【解决方案3】:

    如果您使用的是 C++11,那么 std::future 可能完全符合您的要求:它可以自动捕获使其到达工作线程顶部的异常,并将它们传递给父级在调用std::future::get 时线程。 (在幕后,这与@AnthonyWilliams 的回答完全相同;它已经为您实施了。)

    缺点是没有标准的方法来“停止关心”std::future;甚至它的析构函数也会简单地阻塞,直到任务完成。 [编辑,2017:阻塞析构函数行为是从std::async 返回的伪期货的错误特征,无论如何你都不应该使用它。正常的期货不会阻塞它们的析构函数。但是,如果您使用 std::future,您仍然不能“取消”任务:即使没有人再听答案,承诺履行任务仍将继续在幕后运行。] 这是一个可能会阐明我的意思的玩具示例:

    #include <atomic>
    #include <chrono>
    #include <exception>
    #include <future>
    #include <thread>
    #include <vector>
    #include <stdio.h>
    
    bool is_prime(int n)
    {
        if (n == 1010) {
            puts("is_prime(1010) throws an exception");
            throw std::logic_error("1010");
        }
        /* We actually want this loop to run slowly, for demonstration purposes. */
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        for (int i=2; i < n; ++i) { if (n % i == 0) return false; }
        return (n >= 2);
    }
    
    int worker()
    {
        static std::atomic<int> hundreds(0);
        const int start = 100 * hundreds++;
        const int end = start + 100;
        int sum = 0;
        for (int i=start; i < end; ++i) {
            if (is_prime(i)) { printf("%d is prime\n", i); sum += i; }
        }
        return sum;
    }
    
    int spawn_workers(int N)
    {
        std::vector<std::future<int>> waitables;
        for (int i=0; i < N; ++i) {
            std::future<int> f = std::async(std::launch::async, worker);
            waitables.emplace_back(std::move(f));
        }
    
        int sum = 0;
        for (std::future<int> &f : waitables) {
            sum += f.get();  /* may throw an exception */
        }
        return sum;
        /* But watch out! When f.get() throws an exception, we still need
         * to unwind the stack, which means destructing "waitables" and each
         * of its elements. The destructor of each std::future will block
         * as if calling this->wait(). So in fact this may not do what you
         * really want. */
    }
    
    int main()
    {
        try {
            int sum = spawn_workers(100);
            printf("sum is %d\n", sum);
        } catch (std::exception &e) {
            /* This line will be printed after all the prime-number output. */
            printf("Caught %s\n", e.what());
        }
    }
    

    我只是尝试使用std::threadstd::exception_ptr 编写一个类似工作的示例,但是std::exception_ptr 出了点问题(使用libc++)所以我还没有让它真正工作。 :(

    [编辑,2017 年:

    int main() {
        std::exception_ptr e;
        std::thread t1([&e](){
            try {
                ::operator new(-1);
            } catch (...) {
                e = std::current_exception();
            }
        });
        t1.join();
        try {
            std::rethrow_exception(e);
        } catch (const std::bad_alloc&) {
            puts("Success!");
        }
    }
    

    我不知道我在 2013 年做错了什么,但我确信这是我的错。]

    【讨论】:

    • 你为什么将创建的未来分配给一个名为f 然后emplace_back 它?你不能只做waitables.push_back(std::async(…)); 还是我忽略了一些东西(它编译,问题是它是否会泄漏,但我不知道如何)?
    • 另外,有没有办法通过中止期货而不是waiting 来展开堆栈?类似于“一旦其中一项工作失败,其他工作就不再重要”。
    • 4 年后,我的答案还没有老化。 :) 关于“为什么”:我认为这只是为了清楚起见(表明async 返回的是未来而不是其他东西)。关于“另外,是否存在”:不在std::future 中,但如果您不介意为初学者重写整个 STL,请参阅 Sean Parent 的演讲 "Better Code: Concurrency" 或我的 "Futures from Scratch" 以了解不同的实现方式。 :) 关键搜索词是“取消”。
    • 感谢您的回复。有时间我一定会看一下谈话内容。
    • 2017 年好编辑。与接受的相同,但具有范围异常指针。我会把它放在顶部,甚至可能去掉其余的。
    【解决方案4】:

    您的问题是您可能会收到来自多个线程的多个异常,因为每个线程都可能失败,可能是由于不同的原因。

    我假设主线程以某种方式等待线程结束以检索结果,或者定期检查其他线程的进度,并且对共享数据的访问是同步的。

    简单的解决方案

    简单的解决方案是捕获每个线程中的所有异常,将它们记录在一个共享变量中(在主线程中)。

    所有线程完成后,决定如何处理异常。这意味着所有其他线程继续它们的处理,这可能不是你想要的。

    复杂的解决方案

    更复杂的解决方案是让每个线程在其执行的战略点检查是否从另一个线程抛出异常。

    如果线程抛出异常,在退出线程之前被捕获,异常对象被复制到主线程中的某个容器中(如简单的解决方案),并将某个共享布尔变量设置为true。

    当另一个线程测试这个布尔值时,它看到执行将被中止,并以优雅的方式中止。

    当所有线程都中止时,主线程可以根据需要处理异常。

    【讨论】:

      【解决方案5】:

      从线程抛出的异常将无法在父线程中捕获。线程具有不同的上下文和堆栈,通常不需要父线程留在那里等待子线程完成,以便它可以捕获它们的异常。代码中根本没有该捕获的位置:

      try
      {
        start thread();
        wait_finish( thread );
      }
      catch(...)
      {
        // will catch exceptions generated within start and wait, 
        // but not from the thread itself
      }
      

      您需要在每个线程内捕获异常并解释主线程中线程的退出状态,以重新抛出您可能需要的任何异常。

      顺便说一句,如果线程中没有捕获,则是否完全展开堆栈是特定于实现的,即在调用终止之前甚至可能不会调用您的自动变量的析构函数。一些编译器会这样做,但这不是必需的。

      【讨论】:

        【解决方案6】:

        您能否在工作线程中序列化异常,将其传输回主线程,反序列化并再次抛出?我希望要使其正常工作,异常都必须源自同一个类(或者至少是一小组带有 switch 语句的类)。另外,我不确定它们是否可以序列化,我只是在大声思考。

        【讨论】:

        • 如果两个线程在同一个进程中,为什么还要序列化呢?
        • @Nawaz 因为异常可能引用了其他线程无法自动使用的线程局部变量。
        【解决方案7】:

        确实,没有很好的通用方法将异常从一个线程传输到下一个线程。

        如果你的所有异常都从 std::exception 派生,那么你可以有一个顶级的通用异常捕获,它会以某种方式将异常发送到将再次抛出的主线程。问题是你失去了异常的抛出点。您可能可以编写依赖于编译器的代码来获取此信息并进行传输。

        如果不是所有的异常都继承了 std::exception,那么你就有麻烦了,必须在你的线程中写很多顶级的 catch ......但解决方案仍然成立。

        【讨论】:

          【解决方案8】:

          您需要对工作线程中的所有异常(包括非标准异常,如访问冲突)进行通用捕获,并从工作线程发送消息(我想您有某种消息传递?)到控制线程,包含指向异常的活动指针,并通过创建异常的副本重新抛出那里。 然后worker可以释放原始对象并退出。

          【讨论】:

            【解决方案9】:

            http://www.boost.org/doc/libs/release/libs/exception/doc/tutorial_exception_ptr.html。也可以为您调用的任何函数编写一个包装函数来加入子线程,该函数会自动重新抛出(使用 boost::rethrow_exception)子线程发出的任何异常。

            【讨论】:

              猜你喜欢
              • 2012-07-17
              • 1970-01-01
              • 2010-10-30
              • 1970-01-01
              • 2011-11-08
              • 2012-06-28
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              相关资源
              最近更新 更多