【问题标题】:boost beast Websocket Multi Request Server/Client is not really multiboost beast Websocket 多请求服务器/客户端并不是真正的多
【发布时间】:2020-09-02 13:13:16
【问题描述】:

我正在编写一个 c++ 项目来将客户端 websocket 请求转发到服务器,并在每个客户端处理多个客户端时立即将它们的消息返回给对方,但是对于将每个客户端连接到服务器 async_read 保持在第一个端点上,因此然而,下一个端点已连接但被阻止读取,顺便说一句,每件事都是异步的。

// my server
class websocket_session : public std::enable_shared_from_this<websocket_session>
{
    std::string response = "";

    std::shared_ptr<websocket::stream<beast::tcp_stream>> ws_;

    boost::beast::multi_buffer buffer_;
    boost::beast::multi_buffer buffer_client;

    std::string meth = "", endpoint = "", message = "", host = "", port = "";
    shared_ptr<SocketClient> javaClient;
    net::io_context ioc_client;

public:
    // Take ownership of the socket
    explicit websocket_session(tcp::socket &&socket)
    {
        ws_ = make_shared<websocket::stream<beast::tcp_stream>>(std::move(socket));
    }

    // Start the asynchronous accept operation
    template <class Body, class Allocator>
    void do_accept(http::request<Body, http::basic_fields<Allocator>> req)
    {
        stringstream temprequest;
        temprequest << req.target();
        endpoint = temprequest.str();
        temprequest.str(std::string());
        temprequest << req.method();

        meth = temprequest.str();

        std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";

        //Get IP and port in case endpoint is eligible
        if (EndPointSelection::getWebsocketHost(endpoint, meth, req.version(), host, port) == false)
        {
            std::cerr << "\n\t Websocket is NOT Authorized!!\n";
            return;
        }

        // Set suggested timeout settings for the websocket
        ws_->set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));

        // Set a decorator to change the Server of the handshake
        ws_->set_option(websocket::stream_base::decorator(
            [](websocket::response_type &res) {
                res.set(http::field::server,
                        std::string(BOOST_BEAST_VERSION_STRING) +
                            " advanced-server");
            }));

        // Accept the websocket handshake
        ws_->async_accept(req, beast::bind_front_handler(&websocket_session::on_accept, shared_from_this()));
    }

private:
    void on_accept(beast::error_code ec)
    {
        if (ec)
            fail(ec, "accept");

        std::cout << "\n\t websocket_session::" << __FUNCTION__ << "  Endpoint: " << endpoint << "\n";

        javaClient = std::make_shared<SocketClient>(ioc_client);
        javaClient->run(host.c_str(), port.c_str(), endpoint.c_str());
        javaClient->setWebsocket(ws_);

        ioc_client.run();
        do_read();
    }

    void do_read()
    {
        std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";
        // Read message from client and keep into our buffer
        ws_->async_read(buffer_, beast::bind_front_handler(&websocket_session::on_read, shared_from_this()));
    }

    void on_read(beast::error_code ec, std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);
        std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";

        // This indicates that the websocket_session was closed
        if (ec == websocket::error::closed)
            return;

        if (ec)
            fail(ec, "read");

        ws_->text(ws_->got_text());
        if (ws_->got_text())
        {
            std::cout << "\n\t Received Message from Client: " << boost::beast::buffers_to_string(buffer_.data()) << "\n";
            sleep(1); 
        }
        ioc_client.stop();
        //ws_->async_read(buffer_, beast::bind_front_handler(&websocket_session::on_read, shared_from_this()));
    }

    void on_write(beast::error_code ec, std::size_t bytes_transferred)
    {
        boost::ignore_unused(bytes_transferred);
        std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";
        if (ec)
            fail(ec, "write");

        // Clear the buffer
        buffer_client.consume(buffer_client.size());

        // Do another read
        do_read();
    }
};


//my client
void SocketClient::run(char const *host, char const *port, char const *endpoint)
{
    // Save these for later
    host_ = host;
    endpoint_ = endpoint;
    std::cout << "\n run Endpoint:" << endpoint_ << "\n";

    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    // Look up the domain name
    resolver_.async_resolve(host, port, beast::bind_front_handler(&SocketClient::on_resolve, shared_from_this()));
}

void SocketClient::setWebsocket(std::shared_ptr<websocket::stream<beast::tcp_stream>> ws_cl)
{
    ws_client = ws_cl;
};

void SocketClient::getBuffer(boost::beast::multi_buffer &buffer_client)
{
    buffer_client = buffer_;
}

void SocketClient::writeBuffer(boost::beast::multi_buffer buffer_client)
{
    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    ws_.write(buffer_client.data());
   // ws_.async_write(buffer_client.data(), beast::bind_front_handler(&SocketClient::on_write, shared_from_this()));
    //ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}

void SocketClient::on_resolve(beast::error_code ec, tcp::resolver::results_type results)
{
    if (ec)
        fail(ec, "SocketClient::on_resolve Error: ");
    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    // Set the timeout for the operation

    beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
    // Make the connection on the IP address we get from a lookup
    beast::get_lowest_layer(ws_).async_connect(results, beast::bind_front_handler(&SocketClient::on_connect, shared_from_this()));
}

void SocketClient::on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
{
    if (ec)
        fail(ec, "connect");
    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    // Turn off the timeout on the tcp_stream, because
    // the websocket stream has its own timeout system.
    beast::get_lowest_layer(ws_).expires_never();

    // Set suggested timeout settings for the websocket
    ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));

    // Set a decorator to change the User-Agent of the handshake
    ws_.set_option(websocket::stream_base::decorator(
        [](websocket::request_type &req) {
            req.set(http::field::user_agent,
                    std::string(BOOST_BEAST_VERSION_STRING) +
                        " websocket-client-async");
        }));

    // Update the host_ string. This will provide the value of the
    // Host HTTP header during the WebSocket handshake.
    // See https://tools.ietf.org/html/rfc7230#section-5.4
    host_ += ':' + std::to_string(ep.port());

    // Perform the websocket handshake
    ws_.async_handshake(host_, endpoint_,
                        beast::bind_front_handler(
                            &SocketClient::on_handshake,
                            shared_from_this()));
}

void SocketClient::on_handshake(beast::error_code ec)
{
    if (ec)
       fail(ec, "SocketClient::on_handshake Error: ");
    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}

void SocketClient::on_write(beast::error_code ec, std::size_t bytes_transferred)
{
    boost::ignore_unused(bytes_transferred);
    buffer_.consume(buffer_.size());
    if (ec)
         fail(ec, "SocketClient::on_write Error: ");
    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";

    // Read a message from server and keep it into our buffer
    ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}

void SocketClient::do_read()
{
    ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}

void SocketClient::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
    boost::ignore_unused(bytes_transferred);
    std::cout << "\nHi from SocketClient::" << __FUNCTION__ << "\n";
    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    response_.append(boost::beast::buffers_to_string(buffer_.data())) ;

    if (ec)
    {
        fail(ec, "SocketClient::on_read Error: ");
        return;
    }
    if (response_.empty() == false)
    {
        std::cout << "\nServer: " << response_ << "\t Size: " << buffer_.size() << "\n";
        ws_client->write(net::buffer(response_));
        response_="";
    }
    buffer_.consume(buffer_.size());
    sleep(1);
    if (ws_client->got_text())
    {
        std::cout << "\n CLIENT Message\n ";
        ws_.write(buffer_.data());
    }
    ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}

void SocketClient::close_connection()
{
    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    // Close the WebSocket connection
    ws_.async_close(websocket::close_code::normal, beast::bind_front_handler(&SocketClient::on_close, shared_from_this()));
};

void SocketClient::on_close(beast::error_code ec)
{
    if (ec)
        fail(ec, "SocketClient::on_close Error: ");

    std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
    // If we get here then the connection is closed gracefully
    // The make_printable() function helps print a ConstBufferSequence
    std::cout << beast::make_printable(buffer_.data()) << std::endl;
}

【问题讨论】:

    标签: c++ multithreading asynchronous websocket boost-beast


    【解决方案1】:

    我花了一段时间才理解您的代码在做什么 - 因为您遗漏了一些非常相关的代码部分。看起来你有一些样板代码实际上是死代码。底线是这样的:

    我认为您需要从websocket_session 中删除ioc_client。引入一组分层的io_context 实例会阻塞后续连接,只会引入其他问题。

    我怀疑您正在尝试创建自己的io_context 实例,因为您无权访问处于运行状态的io_context,它位于调用堆栈的下几层。不过我觉得可以参考一下。

    从传入websocket_session的原始套接字,调用socket.get_executor()(或者是socket.get_executor.context()?)以获取执行上下文。这将使您能够在 SocketClient 中创建您的套接字。不要在上面调用run。它已经处于运行状态。如果这不起作用,请找到一种方法将原始 io_context 传递给您的 websocket_session

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-06
      • 1970-01-01
      • 2021-06-19
      • 1970-01-01
      • 2022-01-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多