【问题标题】:Shutting down a multithreaded application by installing a signal handler通过安装信号处理程序关闭多线程应用程序
【发布时间】:2018-10-09 08:52:22
【问题描述】:

在下面的代码中,我创建了一个玩具类,它有一个线程写入队列,而另一个线程从该队列读取并将其打印到stdout。现在,为了彻底关闭系统,我为SIGINT 设置了一个处理程序。我期待信号处理程序设置std::atomic<bool> 变量stopFlag,这将导致threadB 将毒丸(哨兵)推送到遇到threadA 将停止的队列。

class TestClass
{
public:

    TestClass();
    ~TestClass();
    void shutDown();

    TestClass(const TestClass&) = delete;
    TestClass& operator=(const TestClass&) = delete;


private:
    void init();
    void postResults();
    std::string getResult();
    void processResults();

    std::atomic<bool> stopFlag;

    std::mutex outQueueMutex;
    std::condition_variable outQueueConditionVariable;
    std::queue<std::string> outQueue;

    std::unique_ptr<std::thread> threadA;
    std::unique_ptr<std::thread> threadB;
};

void TestClass::init()
{
    threadA = std::make_unique<std::thread>(&TestClass::processResults, std::ref(*this));
    threadB = std::make_unique<std::thread>(&TestClass::postResults, std::ref(*this));
}

TestClass::TestClass():
    stopFlag(false)
{
    init();
}

TestClass::~TestClass()
{
    threadB->join();
}

void TestClass::postResults()
{
    while(true)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
        std::string name = "ABCDEF";
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            outQueue.push(name);
            outQueueConditionVariable.notify_one();
        }
        if(stopFlag)
        {
            /*For shutting down output thread*/
            auto poisonPill = std::string();
            {
                std::unique_lock<std::mutex> lock(outQueueMutex);
                outQueue.push(poisonPill);
                outQueueConditionVariable.notify_one();
            }
            threadA->join();
            break;
        }
    }
}

void TestClass::shutDown()
{
    stopFlag = true;
}

std::string TestClass::getResult()
{
    std::string result;
    {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        while(outQueue.empty())
        {
            outQueueConditionVariable.wait(lock);
        }
        result= outQueue.front();
        outQueue.pop();
    }
    return result;
}

void TestClass::processResults()
{
    while(true)
    {
        const auto result = getResult();

        if(result.empty())
        {
            break;
        }

        std::cout << result << std::endl;

    }
}

static void sigIntHandler(std::shared_ptr<TestClass> t, int)
{
    t->shutDown();
}
static std::function<void(int)> handler;

int main()
{
    auto testClass = std::make_shared<TestClass>();
    handler = std::bind(sigIntHandler, testClass, std::placeholders::_1);
    std::signal(SIGINT, [](int n){ handler(n);});
    return 0;
}

我使用 gcc 5.2 使用 -std=c++14 标志编译了这个。在我的 CentOS 7 机器上按 Ctrl-C 时,我收到以下错误,

terminate called after throwing an instance of 'std::system_error'
  what():  Invalid argument
Aborted (core dumped)

请帮助我了解发生了什么。

【问题讨论】:

    标签: c++ c++11 signals signal-handling


    【解决方案1】:

    发生的情况是您的 main 函数立即退出并销毁全局 handler 对象,然后是 testClass。然后主线程在TestClass::~TestClass 中被阻塞。信号处理程序最终会访问已经销毁的对象,这会导致未定义的行为。

    根本原因是共享指针导致的未定义对象所有权 - 您不知道什么以及何时最终销毁了您的对象。


    更通用的方法是使用另一个线程来处理所有其他线程中的所有信号和阻塞信号。然后,该信号处理线程可以在接收到信号时调用任何函数。

    您也根本不需要智能指针和函数包装器。

    例子:

    class TestClass
    {
    public:
        TestClass();
        ~TestClass();
        void shutDown();
    
        TestClass(const TestClass&) = delete;
        TestClass& operator=(const TestClass&) = delete;
    
    private:
        void postResults();
        std::string getResult();
        void processResults();
    
    
        std::mutex outQueueMutex;
        std::condition_variable outQueueConditionVariable;
        std::queue<std::string> outQueue;
        bool stop = false;
    
        std::thread threadA;
        std::thread threadB;
    };
    
    TestClass::TestClass()
        : threadA(std::thread(&TestClass::processResults, this))
        , threadB(std::thread(&TestClass::postResults, this))
    {}
    
    TestClass::~TestClass() {
        threadA.join();
        threadB.join();
    }
    
    void TestClass::postResults() {
        while(true) {
            std::this_thread::sleep_for(std::chrono::milliseconds(2000));
            std::string name = "ABCDEF";
            {
                std::unique_lock<std::mutex> lock(outQueueMutex);
                if(stop)
                    return;
                outQueue.push(name);
                outQueueConditionVariable.notify_one();
            }
        }
    }
    
    void TestClass::shutDown() {
        std::unique_lock<std::mutex> lock(outQueueMutex);
        stop = true;
        outQueueConditionVariable.notify_one();
    }
    
    std::string TestClass::getResult() {
        std::string result;
        {
            std::unique_lock<std::mutex> lock(outQueueMutex);
            while(!stop && outQueue.empty())
                outQueueConditionVariable.wait(lock);
            if(stop)
                return result;
            result= outQueue.front();
            outQueue.pop();
        }
        return result;
    }
    
    void TestClass::processResults()
    {
        while(true) {
            const auto result = getResult();
            if(result.empty())
                break;
            std::cout << result << std::endl;
        }
    }
    
    int main() {
        // Block signals in all threads.
        sigset_t sigset;
        sigfillset(&sigset);
        ::pthread_sigmask(SIG_BLOCK, &sigset, nullptr);
    
        TestClass testClass;
    
        std::thread signal_thread([&testClass]() {
            // Unblock signals in this thread only.
            sigset_t sigset;
            sigfillset(&sigset);
            int signo = ::sigwaitinfo(&sigset, nullptr);
            if(-1 == signo)
                std::abort();
    
            std::cout << "Received signal " << signo << '\n';
            testClass.shutDown();
        });
    
        signal_thread.join();
    }
    

    【讨论】:

    • 我可以做'std::cout
    • @sank 你是,这是一个普通的线程上下文,而不是信号处理程序上下文。
    • 在我的实际程序中,我在由该类创建的其他线程中运行子进程。我注意到除信号处理程序线程之外的所有线程中阻塞信号的异常行为。我不确定如何进行。顺便说一句,我正在使用 boost 进程库。
    • @sank 一个好的开始是使用演示行为的代码发布另一个问题。
    【解决方案2】:

    在您的平台上,当真正的SIGINT 信号到来时,会调用此信号处理程序。 The list of functions that can be invoked inside of this signal handler 相当有限,调用其他任何内容都会导致未定义的行为。

    【讨论】:

    • 我所做的只是调用将 atomic 设置为 true 的 shutDown 方法。
    • @sank 你实际上是在调用lambda::operator () 然后std::function::operator() 然后std::shared_ptr::shared_ptr(shared_ptr const &amp;) 然后std::shared_ptr::operator-&gt; 然后std::shared_ptr::~shared_ptr (原子应该工作,因为它可能是一个非阻塞变体)。此外,这个类有一个相当奇特的资源所有权模式,实际上可能会导致声明的异常。
    • 我明白你的意思。那么如何设置信号处理程序呢?我应该将 stopFlag 和 shutdown 设为静态吗?
    • 请评论这个类的“花式资源所有权模式”。
    • @sank 我想关于“花式所有权”的部分已经在另一个答案中进行了解释。至于正确的信号处理,我只能建议在主线程中创建一个阻止信号处理的单例类,启动它自己的线程,设置信号处理程序来设置原子标志并阻止wait。此类应提供一个单一的公共方法来获取标志状态。在析构函数中,如果尚未设置标志,则此类应将SIGINT 发布到信号等待线程中。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-08-15
    • 1970-01-01
    • 1970-01-01
    • 2021-12-21
    • 2014-02-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多