【问题标题】:shared_from_this throws bad_weak_ptr with boost::asioshared_from_this 用 boost::asio 抛出 bad_weak_ptr
【发布时间】:2018-06-03 14:50:41
【问题描述】:

首先,我已阅读列出的所有相关问题。

他们说,“你必须有一个现有的 shared_ptr 才能使用 shared_from_this。”据我所知,我不可能违反这个条件。我将 Foo 的实例创建为 shared_ptr,并强制它始终创建为 shared_ptr。然后,我将 shared_ptr 存储在一个集合中。然而,当 shared_from_this 被调用时,我仍然得到 bad_weak_ptr 异常。

#pragma once

#include <memory>
#include <vector>

//--------------------------------------------------------------------
class Foo : std::enable_shared_from_this<Foo>
{
public:

    typedef std::shared_ptr<Foo> SharedPtr;

    // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
    static Foo::SharedPtr Create()
    {
        return Foo::SharedPtr(new Foo());
    };

    Foo(const Foo &) = delete;
    Foo(Foo &&) = delete;
    Foo & operator = (const Foo &) = delete;
    Foo & operator = (Foo &&) = delete;
    ~Foo() {};

    // We have to defer the start until we are fully constructed because we share_from_this()
    void Start()
    {
        DoStuff();
    }

private:

    Foo() {}

    void DoStuff()
    {
        auto self(shared_from_this());
    }
};

//--------------------------------------------------------------------
int main()
{
    std::vector<Foo::SharedPtr> foos;
    Foo::SharedPtr foo = Foo::Create();
    foos.emplace_back(foo);
    foo->Start();

    return 0;
}

【问题讨论】:

  • 我不认为这是最小的。您是否建议在不阻止问题重现的情况下,没有一个表达式无法删除?虽然不一定强制删除所有此类表达式,但示例越大,尽可能多地删除就越重要。很少有编程问题不能简化为一个小例子。我并不总是在较长的程序列表中要求 MCVE,但我经常这样做。

标签: c++ shared-ptr


【解决方案1】:

您必须根据

继承 enable_shared_from_thispublic 说明符

从 std::enable_shared_from_this 公开继承为类型 T 提供了一个成员函数 shared_from_this。

来自http://en.cppreference.com/w/cpp/memory/enable_shared_from_this

所以写

class Foo : public std::enable_shared_from_this<Foo>

【讨论】:

    【解决方案2】:

    首先,您在发布工作之前启动线程,因此io_service::run() 很容易在DoAccept 实际完成之前完成。

    接下来,基类必须是 PUBLIC 才能使 enable_shared_from_this 起作用:

    class Connection : public std::enable_shared_from_this<Connection> {
    

    工作自包含代码:

    #include <iostream>
    #include <mutex>
    namespace SomeNamespace{
    struct Logger {
        enum { LOGGER_SEVERITY_INFO };
        void Log(std::string const& msg, std::string const& file, unsigned line, int level) const {
            static std::mutex mx;
            std::lock_guard<std::mutex> lk(mx);
            std::cout << file << ":" << line << " level:" << level << " " << msg << "\n";
        }
        template <typename... Args>
        void LogF(std::string const& msg, Args const&... args) const {
            static std::mutex mx;
            std::lock_guard<std::mutex> lk(mx);
            static char buf[2048];
            snprintf(buf, sizeof(buf)-1, msg.c_str(), args...);
            std::cout << buf << "\n";
        }
        static Logger &GetInstance() {
            static Logger This;
            return This;
        }
    };
    } // namespace Somenamespace
    
    #include <boost/asio.hpp>
    
    #include <atomic>
    #include <condition_variable>
    #include <memory>
    
    //--------------------------------------------------------------------
    class ConnectionManager;
    
    //--------------------------------------------------------------------
    class Connection : public std::enable_shared_from_this<Connection> {
      public:
        typedef std::shared_ptr<Connection> SharedPtr;
    
        // Ensure all instances are created as shared_ptr in order to fulfill requirements for shared_from_this
        static Connection::SharedPtr Create(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket &socket);
    
        Connection(const Connection &) = delete;
        Connection(Connection &&) = delete;
        Connection &operator=(const Connection &) = delete;
        Connection &operator=(Connection &&) = delete;
        ~Connection();
    
        // We have to defer the start until we are fully constructed because we share_from_this()
        void Start();
        void Stop();
    
        void Send(const std::vector<char> &data);
    
      private:
        ConnectionManager *m_owner;
        boost::asio::ip::tcp::socket m_socket;
        std::atomic<bool> m_stopped;
        boost::asio::streambuf m_receiveBuffer;
        mutable std::mutex m_sendMutex;
        std::shared_ptr<std::vector<boost::asio::const_buffer> > m_sendBuffers;
        bool m_sending;
    
        std::vector<char> m_allReadData; // for testing
    
        Connection(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket socket);
    
        void DoReceive();
        void DoSend();
    };
    
    //--------------------------------------------------------------------
    
    //#include "Connection.h"
    //#include "ConnectionManager.h"
    //**ConnectionManager.h **
    
    //#pragma once
    
    //#include "Connection.h"
    
    // Boost Includes
    #include <boost/asio.hpp>
    
    // Standard Includes
    #include <thread>
    #include <vector>
    
    //--------------------------------------------------------------------
    class ConnectionManager {
      public:
        ConnectionManager(unsigned port, size_t numThreads);
        ConnectionManager(const ConnectionManager &) = delete;
        ConnectionManager(ConnectionManager &&) = delete;
        ConnectionManager &operator=(const ConnectionManager &) = delete;
        ConnectionManager &operator=(ConnectionManager &&) = delete;
        ~ConnectionManager();
    
        void Start();
        void Stop();
    
        void OnConnectionClosed(Connection::SharedPtr connection);
    
      protected:
        boost::asio::io_service m_io_service;
        boost::asio::ip::tcp::acceptor m_acceptor;
        boost::asio::ip::tcp::socket m_listenSocket;
        std::vector<std::thread> m_threads;
    
        mutable std::mutex m_connectionsMutex;
        std::vector<Connection::SharedPtr> m_connections;
    
        void IoServiceThreadProc();
    
        void DoAccept();
    };
    
    //--------------------------------------------------------------------
    
    #include <boost/bind.hpp>
    
    #include <algorithm>
    
    //--------------------------------------------------------------------
    Connection::SharedPtr Connection::Create(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket &socket) {
        return Connection::SharedPtr(new Connection(connectionManager, std::move(socket)));
    }
    
    //--------------------------------------------------------------------
    Connection::Connection(ConnectionManager *connectionManager, boost::asio::ip::tcp::socket socket)
            : m_owner(connectionManager), m_socket(std::move(socket)), m_stopped(false), m_receiveBuffer(), m_sendMutex(),
              m_sendBuffers(), m_sending(false), m_allReadData() {}
    
    //--------------------------------------------------------------------
    Connection::~Connection() {
        // Boost uses RAII, so we don't have anything to do. Let thier destructors take care of business
    }
    
    //--------------------------------------------------------------------
    void Connection::Start() { DoReceive(); }
    
    //--------------------------------------------------------------------
    void Connection::Stop() {
        // The entire connection class is only kept alive, because it is a shared pointer and always has a ref count
        // as a consequence of the outstanding async receive call that gets posted every time we receive.
        // Once we stop posting another receive in the receive handler and once our owner release any references to
        // us, we will get destroyed.
        m_stopped = true;
        m_owner->OnConnectionClosed(shared_from_this());
    }
    
    //--------------------------------------------------------------------
    void Connection::Send(const std::vector<char> &data) {
        std::lock_guard<std::mutex> lock(m_sendMutex);
    
        // If the send buffers do not exist, then create them
        if (!m_sendBuffers) {
            m_sendBuffers = std::make_shared<std::vector<boost::asio::const_buffer> >();
        }
    
        // Copy the data to be sent to the send buffers
        m_sendBuffers->emplace_back(boost::asio::buffer(data));
    
        DoSend();
    }
    
    //--------------------------------------------------------------------
    void Connection::DoSend() {
        // According to the boost documentation, we cannot issue an async_write while one is already outstanding
        //
        // If that is the case, it is OK, because we've added the data to be sent to a new set of buffers back in
        // the Send method. Notice how the original buffer is moved, so therefore will be null below and how Send
        // will create new buffers and accumulate data to be sent until we complete in the lamda
        //
        // When we complete in the lamda, if we have any new data to be sent, we call DoSend once again.
        //
        // It is important though, that DoSend is only called from the lambda below and the Send method.
    
        if (!m_sending && m_sendBuffers) {
            m_sending = true;
            auto copy = std::move(m_sendBuffers);
            auto self(shared_from_this());
    
            boost::asio::async_write(m_socket, *copy,
                 [self, copy](const boost::system::error_code &errorCode, size_t bytes_transferred) {
                     std::lock_guard<std::mutex> lock(self->m_sendMutex);
                     self->m_sending = false;
    
                     if (errorCode) {
                         // An error occurred
                         return;
                     }
    
                     self->DoSend();
                 });
        }
    }
    
    //--------------------------------------------------------------------
    void Connection::DoReceive() {
        SomeNamespace::Logger::GetInstance().Log(__PRETTY_FUNCTION__, __FILE__, __LINE__, SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
        auto self(shared_from_this()); // ***EXCEPTION HERE****
    
        boost::asio::async_read_until(m_socket, m_receiveBuffer, '#',
          [self](const boost::system::error_code &errorCode, size_t bytesRead) {
              if (errorCode) {
                  // Notify our masters that we are ready to be destroyed
                  self->m_owner->OnConnectionClosed(self);
    
                  // An error occured
                  return;
              }
    
              // Grab the read data
              std::istream stream(&self->m_receiveBuffer);
              std::string data;
              std::getline(stream, data, '#');
    
              // Issue the next receive
              if (!self->m_stopped) {
                  self->DoReceive();
              }
          });
    }
    
    //--------------------------------------------------------------------
    
    //**ConnectionManager.cpp **
    
    //#include "ConnectionManager.h"
    
    //#include "Logger.h"
    
    #include <boost/bind.hpp>
    
    #include <system_error>
    
    //------------------------------------------------------------------------------
    ConnectionManager::ConnectionManager(unsigned port, size_t numThreads)
            : m_io_service(), m_acceptor(m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
              m_listenSocket(m_io_service), m_threads(numThreads) {}
    
    //------------------------------------------------------------------------------
    ConnectionManager::~ConnectionManager() { Stop(); }
    
    //------------------------------------------------------------------------------
    void ConnectionManager::Start() {
        if (m_io_service.stopped()) {
            m_io_service.reset();
        }
    
        DoAccept();
    
        for (auto &thread : m_threads) {
            if (!thread.joinable()) {
                thread = std::thread(&ConnectionManager::IoServiceThreadProc, this);
            }
        }
    }
    
    //------------------------------------------------------------------------------
    void ConnectionManager::Stop() {
        {
            std::lock_guard<std::mutex> lock(m_connectionsMutex);
            m_connections.clear();
        }
    
        // TODO - Will the stopping of the io_service be enough to kill all the connections and ultimately have them get
        // destroyed?
        //        Because remember they have outstanding ref count to thier shared_ptr in the async handlers
        m_io_service.stop();
    
        for (auto &thread : m_threads) {
            if (thread.joinable()) {
                thread.join();
            }
        }
    }
    
    //------------------------------------------------------------------------------
    void ConnectionManager::IoServiceThreadProc() {
        try {
            // Log that we are starting the io_service thread
            {
                const std::string msg("io_service socket thread starting.");
                SomeNamespace::Logger::GetInstance().Log(msg, __FILE__, __LINE__,
                                                          SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
            }
    
            // Run the asynchronous callbacks from the socket on this thread
            // Until the io_service is stopped from another thread
            m_io_service.run();
        } catch (std::system_error &e) {
            SomeNamespace::Logger::GetInstance().LogF("System error caught in io_service socket thread. Error Code: %d", e.code().value());
        } catch (std::exception &e) {
            SomeNamespace::Logger::GetInstance().LogF("Standard exception caught in io_service socket thread. Exception: %s", e.what());
        } catch (...) {
            SomeNamespace::Logger::GetInstance().LogF("Unhandled exception caught in io_service socket thread.");
        }
    
        SomeNamespace::Logger::GetInstance().LogF("io_service socket thread exiting."); 
    }
    
    //------------------------------------------------------------------------------
    void ConnectionManager::DoAccept() {
        SomeNamespace::Logger::GetInstance().Log(__PRETTY_FUNCTION__, __FILE__, __LINE__, SomeNamespace::Logger::LOGGER_SEVERITY_INFO);
    
        m_acceptor.async_accept(m_listenSocket, [this](const boost::system::error_code errorCode) {
            if (errorCode) {
                return;
            }
    
            {
                // Create the connection from the connected socket
                Connection::SharedPtr connection = Connection::Create(this, m_listenSocket);
                {
                    std::lock_guard<std::mutex> lock(m_connectionsMutex);
                    m_connections.push_back(connection);
                    connection->Start();
                }
            }
    
            DoAccept();
        });
    }
    
    //------------------------------------------------------------------------------
    void ConnectionManager::OnConnectionClosed(Connection::SharedPtr connection) {
        std::lock_guard<std::mutex> lock(m_connectionsMutex);
    
        auto itConnection = std::find(m_connections.begin(), m_connections.end(), connection);
        if (itConnection != m_connections.end()) {
            m_connections.erase(itConnection);
        }
    }
    
    //------------------------------------------------------------------------------
    //**main.cpp**
    //#include "ConnectionManager.h"
    
    #include <cstring>
    #include <iostream>
    #include <string>
    
    int main() {
        ConnectionManager connectionManager(4000, 2);
        connectionManager.Start();
    
        std::this_thread::sleep_for(std::chrono::minutes(1));
    
        connectionManager.Stop();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-02-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多