【问题标题】:How to eliminate crashes when destroying boost::asio entities on fly?如何在销毁 boost::asio 实体时消除崩溃?
【发布时间】:2020-12-21 23:06:01
【问题描述】:

注意!!!问题是针对boost::asio library 的专家。不幸的是,我不能让代码更紧凑,它包含描述问题的最少数量。该代码是示例,人工创建的。在 cmets 中已知和描述的崩溃的地方,它旨在说明崩溃! NO need 对代码调试有任何帮助...

问题是关于如何设计 asio 服务器,而不是关于 - 它在哪里崩溃!!!

此示例接近官方 boost::asio 文档中的“聊天服务器”设计。但是,与官方示例不同,只有连接类的对象是动态创建/销毁的,在我的示例中,服务器及其连接类实体都是动态创建/销毁的......我确信这种模式的实现应该在asio爱好者中广为人知,下面描述的问题应该已经有人解决了......

请查看代码。 在这里,CAsioServer 和 CAsioConnection 的实体是动态创建和销毁的。

#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>


class CAsioConnection
    : public std::enable_shared_from_this<CAsioConnection>
{
public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket, std::set<CAsioConnection::PtrType>& connections)
        : socket_(std::move(socket)), connections_(connections)
    {
        std::cout << "-- CAsioConnection is creating, socket: " << socket_.native_handle() << "\n";
    }

    virtual ~CAsioConnection()
    {
        std::cout << "-- CAsioConnection is destroying , socket: " << socket_.native_handle() << "\n";
    }

    void read() { do_read(); }

private:
    void do_read(void)
    {
        uint8_t buff[3];

        asio::async_read(socket_, asio::buffer(buff,3),
            [this](std::error_code ec, std::size_t  /*length*/) {
            if (!ec)
            {
                do_read();
            }
            else
            {
                std::cout << "-- CAsioConnection::do_read() error : " << ec.message() << "\n";
                // Here is the crash N2
                connections_.erase(shared_from_this());
                // Crash may be fixed by the code below
                //if (ec.value() != 1236) // (winerror.h) #define ERROR_CONNECTION_ABORTED 1236L
                //  connections_.erase(shared_from_this());
            }
        });
    }

    asio::ip::tcp::socket socket_;
    std::set<CAsioConnection::PtrType>& connections_;
};

class CAsioServer
    : public std::enable_shared_from_this<CAsioServer>
{
public:
    using PtrType = std::shared_ptr<CAsioServer>;

    CAsioServer(int port, asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
        : port_(port), acceptor_(io, endpoint)
    {
        std::cout << "-- CAsioServer is creating, port: " << port_ << "\n";
    }

    virtual ~CAsioServer()
    {
        std::cout << "-- CAsioServer is destroying , port: " << port_ << "\n";
    }

    int port(void) { return port_; }

    void accept(void) { do_accept(); }
private:
    void do_accept()
    {
        acceptor_.async_accept([this](std::error_code ec, asio::ip::tcp::socket socket) {
            if (!ec)
            {
                std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
                auto c = std::make_shared<CAsioConnection>(std::move(socket), connections_);
                connections_.insert(c);
                c->read();
            }
            else
            {
                // Here is the crash N1
                std::cout << "-- CAsioServer::do_accept() error : " << ec.message() << "\n";
                // Crash may be fixed by the code below
                //if (ec.value() == 995) // (winerror.h) #define ERROR_OPERATION_ABORTED 995L
                //  return;
            }
            // Actually here is the crash N1 )), but the fix is above...
            do_accept();
        });
    }

    int port_;
    asio::ip::tcp::acceptor acceptor_;
    std::set<CAsioConnection::PtrType> connections_;
};

//*****************************************************************************

class CTcpBase
{
public:
    CTcpBase()
    {
        // heart beat timer to keep it alive
        do_heart_beat();
        t_ = std::thread([this] {
            std::cout << "-- io context is RUNNING!!!\n";
            io_.run();
            std::cout << "-- io context has been STOPED!!!\n";
        });
    }

    virtual ~CTcpBase()
    {
        io_.stop();

        if (t_.joinable())
            t_.join();
    }

    void add_server(int port)
    {
        io_.post([this, port] 
        {
            for (auto s : servers_)
                if (port == s->port())
                    return;

            auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port);
            auto s = std::make_shared<CAsioServer>(port, io_, endpoint);
            s->accept();
            servers_.insert(s);
        });
    }

    void remove_server(int port)
    {
        io_.post([this, port] 
        {
            for (auto s : servers_)
                if (port == s->port())
                    { servers_.erase(s); return; }
        });
    }

private:

    void do_heart_beat(void)
    {
        std::cout << "-- beat\n";
        auto timer = std::make_shared<asio::steady_timer>(io_, asio::chrono::milliseconds(3000));
        timer->async_wait([timer, this](const asio::error_code& ec) {
            do_heart_beat();
        });
    }

    asio::io_context io_;
    std::thread t_;
    std::set<CAsioServer::PtrType> servers_;
};

//*****************************************************************************

int main(void)
{
    CTcpBase tcp_base;

    std::cout << "CONNECT the server to port 502\n";
    tcp_base.add_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(20));
    
    std::cout << "REMOVE the server from port 502\n";
    tcp_base.remove_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(10));

    return 0;
}

假设CTcpBase::add_server()CTcpBase::remove_server() 将被不同线程的外部客户端调用。 asio 上下文在它自己的线程中处理它。 让我们考虑两种情况:

  1. 启动应用程序并等待半分钟。 崩溃发生在CAsioServer::do_accept() 中,请参见下面的输出。 Debug Console Output
  2. 启动应用程序。任何外部客户端连接到端口 502 并等待不到 20 秒。 崩溃发生在CAsioConnection::do_read(),请参见下面的输出。 Debug Console Output

似乎 asio 框架调用推迟了 asio::async_read()acceptor_.async_accept() 处理程序,而其类的实体已经销毁。

我已通过错误检查修复了处理程序,但该解决方案似乎并不可靠。谁知道可能还有什么其他错误和场景...有时,当客户端断开连接时,我需要清理asio::async_read() 设置的connection_,我如何确定服务器或连接对象仍然存在?...

有什么方法可以让 boost::asio 框架阻止为已经被销毁的对象调用延迟的处理程序?或者如何通过对象已经被销毁的错误码识别(be 100% sure)?或者我在 asio 范围内还有其他解决方案或设计模式 - 如何在一个运行的线程中处理动态创建/销毁的服务器及其连接,而无需互斥锁和其他东西......

【问题讨论】:

  • 不相关的战术说明:遍历set 寻找项目没有多大意义。 set 专为高速查找独特元素而设计,循环通过 set 将比使用 set::find 要求 set 完成工作要慢几个数量级。也就是说,如果您的编译器至少支持 C++11,set::erase 将一次性为您完成整个工作。
  • 我建议使用minimal reproducible example 更新代码。如果您添加包含并将pseudo_main5 更改为main,看起来您离编译的示例不远了,而且为了制作他们可以试验的示例而更改的人越少,他们就越不可能'会添加他们自己的错误或意外修复你正在寻找的错误。另外,通过在制作 MRE 时进行几轮分而治之,您可能会在其他人之前发现错误。这是每个人的胜利。
  • 注意:不要只在控制台中调试,使用 IDE 附带的调试器来收集信息 还要留意程序的错误消息。当您看到像 DDDDDDDD 这样的数字太常规而不能算运气时,look it up。 DDDDDDDD 通常是释放堆内存。 this 已发布。弄清楚 this 应该指的是什么(在调试器中使用回溯),您首先确定了是谁发布了它。
  • 感谢 cmets。我已经更新了代码以使其更具可编译性。正如您通过输出“调试控制台输出”图片所看到的那样,我对其进行了测试。我确切地知道崩溃发生的地点和原因。这不是我的真实代码 - 它是问题的示例/描述。问题是 - 如何强制 asio 不对已经销毁的对象调用 posponed 处理程序,或者如何通过错误代码识别对象已经被销毁,或者我在 asio 范围内还有其他解决方案......跨度>
  • 我投票结束这个问题,因为作者不明白 minimal reproducible example 是什么。基本上他在这里放弃调试工作。

标签: c++ boost boost-asio asio


【解决方案1】:

首先检查您的io_service 是否严格 运行单线程。这在代码中是不可见的。如果不是,则共享状态(如connections_)需要同步访问。

事实上,你可以有一个接受循环形式的逻辑链,但要利用这一点,你应该让对connections_的所有访问都发生在那里,参见例如

更新

  • buff 是一个局部变量,这会导致未定义行为,因为它在 async_read 操作的整个时间内都无效。

  • 一般来说,让shared_from_this 成语 还保留一个共享指针容器已经 决定了生命周期是没有意义的。

    您的问题似乎是有时 CAsioServer 被简单地破坏了,这意味着 connections_ 的所有元素都被释放,那时它们的 CAsioConnection 对象可能被破坏。它还会破坏CAsioServer

    当一个 Asio 对象被破坏时,任何挂起的异步操作都会以asio::error:operation_aborted 失败,这确实意味着你已经响应了。但是,当调用完成处理程序时,对象已经变为无效。

    my comment 中,我刚刚注意到缺少一个关键要素:您永远不会在任何完成处理程序中捕获/绑定指向CAsioConnection 的共享指针

    这是非常不习惯的。

    相反,您使用共享指针来管理生命周期。如果您还需要一个连接列表,则将其设为弱指针列表,以便它仅观察生命周期。

变化点:

  • 无需让服务器 enable_shared_from_this

  • connections_ 应该保存弱指针甚至是非拥有指针。弱指针显然在这里更安全。事实上,您可以选择丢弃该容器,因为似乎没有任何东西在使用它。在下面的示例中,我选择保留它,以便您可以看到它的实际效果。

  • 在完成处理程序中捕获shared_from_this,以确保对象在触发时仍然有效:

     asio::async_read(socket_, asio::buffer(buff,3),
         [this, self=shared_from_this()](error_code ec, std::size_t  /*length*/) {
    

简化

注意我选择了std::list,因为它消除了对等式/排序的需要(请参阅std::owner_less&lt;&gt;),因为在CAsioConnection 中存储对容器的引用的方式变得丑陋类 - 使其循环依赖(在实例化 owner_less&lt;&gt; 类之前,CAsioConnection 类型尚未完成)。我只是选择退出(不需要的?)复杂性。

Live On Coliru

#include <boost/asio.hpp>
#include <iostream>
#include <list>
#include <memory>

namespace asio = boost::asio;
using error_code = boost::system::error_code; // compat

class CAsioConnection : public std::enable_shared_from_this<CAsioConnection> {
  public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket) : socket_(std::move(socket)) {
        log(__FUNCTION__);
    }

    ~CAsioConnection() { log(__FUNCTION__); }

    void read() { do_read(); }

  private:
    void log(std::string_view msg) const {
        error_code ec;
        std::clog << msg << ", socket: " << socket_.remote_endpoint(ec) << "\n";
    }

    uint8_t buff[256];
    void do_read() {
        asio::async_read(socket_, asio::buffer(buff),
             [this, self = shared_from_this()](error_code ec, std::size_t length) {
                 if (!ec) {
                     log(__FUNCTION__ + (" length: " + std::to_string(length)));
                     do_read();
                 } else {
                     log(__FUNCTION__ + (" error: " + ec.message()));
                 }
             });
    }

    asio::ip::tcp::socket socket_;
};

class CAsioServer {
  public:
    CAsioServer(asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
            : acceptor_(io, endpoint) { log(__FUNCTION__); }

    ~CAsioServer() { log(__FUNCTION__); }
    int port() const { return acceptor_.local_endpoint().port(); }
    void accept() { do_accept(); }

  private:
    void do_accept() {
        acceptor_.async_accept([this](error_code ec,
                                      asio::ip::tcp::socket socket) {
            if (!ec) {
                auto c = std::make_shared<CAsioConnection>(std::move(socket));
                connections_.push_back(c);
                c->read();
            } else {
                log(__FUNCTION__ + (" error: " + ec.message()));
            }

            connections_.remove_if(std::mem_fn(&WeakPtr::expired));

            if (acceptor_.is_open())
                do_accept();
        });
    }

    void log(std::string_view msg) const {
        std::clog << msg << ", port: " << port() << "\n";
    }

    asio::ip::tcp::acceptor acceptor_;
    using WeakPtr = std::weak_ptr<CAsioConnection>;
    std::list<WeakPtr> connections_;
};

int main() {
    boost::asio::io_context io;

    CAsioServer server(io, { {}, 7878 });
    server.accept();

    io.run_for(std::chrono::seconds(10));
}

输出:

./a.out& sleep 1; nc -w 1 127.0.0.1 7878 < main.cpp
CAsioServer, port: 7878
CAsioConnection, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() length: 256, socket: 127.0.0.1:50628
operator() error: End of file, socket: 127.0.0.1:50628
~CAsioConnection, socket: 127.0.0.1:50628
~CAsioServer, port: 7878

【讨论】:

  • boost::asio 的主要优势是保证如果 asio 上下文在一个线程中运行,那么您不需要任何互斥锁和其他东西来保护您的数据!!!为了保持这种方法,我使用 ising io.post() 方法设计了 CTcpBase::add_server() 和 CTcpBase::remove_server() ...
  • 我完全意识到这一点,但正如我在您的代码或描述中所说的那样,它没有表明它是单线程的。我什至没有看到您提到的io.post() ¹方法-这可能是一个线索。就像其他人说的那样,请让您的样本自成一体,这样我们就不必猜测了。
  • ¹ 更好的方法是post(io, task),其中post 将通过ADL 调用[boost::]asio::post
  • 再次阅读您的代码时,我想我看到了罪魁祸首。更新我的答案
  • 有些延迟(生活发生了)。还注意到UB 的另一个来源,因为buff 是本地的。
【解决方案2】:

小学,我亲爱的华生

问题的关键——我是一个非常信任的人

我应该提一下,我使用的是非加速 Asio 版本。 1.18.0,带VS2017和Win10。因此,以下所有解释都与 Windows 的 Asio 部分有关。有可能,posix 实现的工作方式有点不同。

最初实现的主要思想是: - 能够通过在适当的set&lt;&gt; 集合中添加/删除它们来控制服务器/连接对象的数量。

下面的文字描述了为什么如果不付出额外的努力就无法工作。

根据 Asio 文档:

~basic_stream_socket();这个函数销毁socket,取消 与套接字关联的任何未完成的异步操作为 如果通过调用取消。

我的错误是认为异步操作的取消将在析构函数的范围内执行同时调用异步处理程序

这很有趣,我想他们为什么在异步处理程序中使用 self 指针,如果在对象的销毁阶段应该拒绝异步处理程序。 正确答案——异步处理程序不会被拒绝))。

事实上,async handlers 之后会被调用,到时候类实体已经被销毁了。

发生了什么:

  1. 销毁服务器或连接类时:WinSock2 ::closesocket()~basic_stream_socket() 中调用套接字句柄。
  2. iocontext.run() 内部的下一次迭代中:win_iocp_io_context::do_one() 调用::GetQueuedCompletionStatus() 以获取异步操作结果并启动与已销毁套接字关联的异步处理程序。

有两个场景对我们来说很有趣:

  1. Socket 等待数据。
  2. 套接字正在破坏(例如在连接类的析构函数内)。
  3. 调用有错误的异步处理程序。

在这种情况下,即使类已经被销毁,我们也可能会检查错误代码并关闭异步处理程序。我在问题的代码中展示了糟糕但有效的解决方案。

  1. 套接字获取一些数据。异步处理程序尚未启动。
  2. 套接字正在破坏(例如在连接类的析构函数内)。
  3. 异步处理程序已启动没有错误!!!灾难。

在这种情况下,错误代码无法拯救我们。崩溃发生。 因此,检查异步处理程序中的错误代码的方法不起作用。

下面的代码通过为服务器和连接类引入hasta_la_vista() 方法解决了所有问题。不是超级优雅但钢筋混凝土解决方案:

#include <map>
#include <array>
#include <set>
#include <vector>
#include <deque>
#include <thread>
#include <iostream>
#include <asio.hpp>
#include <iomanip>


class CAsioConnection
    : public std::enable_shared_from_this<CAsioConnection>
{
public:
    using PtrType = std::shared_ptr<CAsioConnection>;

    CAsioConnection(asio::ip::tcp::socket socket, std::set<CAsioConnection::PtrType>& connections)
        : socket_(std::move(socket)), connections_(connections), destroying_in_progress(false)
    {
        std::cout << "-- CAsioConnection is creating\n";
    }

    virtual ~CAsioConnection()
    {
        std::cout << "-- CAsioConnection is destroying\n";
    }

    void read() { do_read(); }

    void hasta_la_vista(void)
    {
        destroying_in_progress = true;
        std::error_code ec;
        socket_.cancel(ec);
    }

private:
    void do_read(void)
    {
        auto self(shared_from_this());
        asio::async_read(socket_, asio::buffer(buff),
            [this, self](std::error_code ec, std::size_t  /*length*/) {

            if (destroying_in_progress)
                return;

            if (!ec)
            {
                do_read();
            }
            else
            {
                std::cout << "-- CAsioConnection::do_read() error : (" << ec.value() << ") " << ec.message() << "\n";
                hasta_la_vista();
                connections_.erase(shared_from_this());
            }
        });
    }

    uint8_t buff[3];
    asio::ip::tcp::socket socket_;
    bool destroying_in_progress;
    std::set<CAsioConnection::PtrType>& connections_;
};

//*****************************************************************************

class CAsioServer
    : public std::enable_shared_from_this<CAsioServer>
{
public:
    using PtrType = std::shared_ptr<CAsioServer>;

    CAsioServer(int port, asio::io_context& io, const asio::ip::tcp::endpoint& endpoint)
        : port_(port), destroying_in_progress(false), acceptor_(io, endpoint)
    {
        std::cout << "-- CAsioServer is creating, port: " << port_ << "\n";
    }

    virtual ~CAsioServer()
    {
        for (auto c : connections_)
        {
            c->hasta_la_vista();
        }

        std::cout << "-- CAsioServer is destroying , port: " << port_ << "\n";
    }

    int port(void) { return port_; }

    void accept(void) { do_accept(); }
    void hasta_la_vista(void) 
    { 
        destroying_in_progress = true;
        std::error_code ec;
        acceptor_.cancel(ec);
    }
private:
    void do_accept()
    {
        auto self(shared_from_this());
        acceptor_.async_accept([this, self](std::error_code ec, asio::ip::tcp::socket socket) {

            if (destroying_in_progress)
                return;

            if (!ec)
            {
                std::cout << "-- CAsioServer::do_accept() connection to socket: " << socket.native_handle() << "\n";
                auto c = std::make_shared<CAsioConnection>(std::move(socket), connections_);
                connections_.insert(c);
                c->read();
            }
            else
            {
                std::cout << "-- CAsioServer::do_accept() error : (" << ec.value() << ") "<<  ec.message() << "\n";
            }
            do_accept();
        });
    }

    int port_;
    bool destroying_in_progress;
    asio::ip::tcp::acceptor acceptor_;
    std::set<CAsioConnection::PtrType> connections_;
};

//*****************************************************************************

class CTcpBase
{
public:
    CTcpBase()
    {
        // heart beat timer to keep it alive
        do_heart_beat();
        t_ = std::thread([this] {
            std::cout << "-- io context is RUNNING!!!\n";
            io_.run();
            std::cout << "-- io context has been STOPED!!!\n";
        });
    }

    virtual ~CTcpBase()
    {
        io_.stop();

        if (t_.joinable())
            t_.join();
    }

    void add_server(int port)
    {
        io_.post([this, port] {
            for (auto& s : servers_)
                if (port == s->port())
                    return;

            auto endpoint = asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port);
            auto s = std::make_shared<CAsioServer>(port, io_, endpoint);
            s->accept();
            servers_.insert(s);
        });
    }

    void remove_server(int port)
    {
        io_.post([this, port] {
            for (auto s : servers_)
                if (port == s->port())
                { 
                    s->hasta_la_vista();
                    servers_.erase(s); 
                    return; 
                }
        });
    }

private:

    void do_heart_beat(void)
    {
        std::cout << "-- beat\n";
        auto timer = std::make_shared<asio::steady_timer>(io_, asio::chrono::milliseconds(3000));
        timer->async_wait([timer, this](const std::error_code& ec) {
            do_heart_beat();
        });
    }

    asio::io_context io_;
    std::thread t_;
    std::set<CAsioServer::PtrType> servers_;
};

//*****************************************************************************

int main(void)
{
    CTcpBase tcp_base;

    std::cout << "CONNECT the server to port 502\n";
    tcp_base.add_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(20));

    std::cout << "REMOVE the server from port 502\n";
    tcp_base.remove_server(502);

    std::this_thread::sleep_for(std::chrono::seconds(10));
    
    return 0;
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-03-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-03
    • 1970-01-01
    • 2023-03-05
    相关资源
    最近更新 更多