【问题标题】:Boost::Beast : server with websocket pipeliningBoost::Beast : 带有 websocket 管道的服务器
【发布时间】:2019-10-06 21:33:53
【问题描述】:

我正在编写一个带有 boost beast 1.70 和 mysql 8 C 连接器的 c++ websocket 服务器。服务器将同时连接多个客户端。特殊性是每个客户端将连续向服务器执行 100 个 websocket 请求。对于我的服务器,每个请求都是“cpu light”,但服务器对每个请求执行“时间繁重”的 sql 请求。

我已经使用 websocket_server_coro.cpp 示例启动了我的服务器。服务器步骤是:

1) 一个 websocket 读取

2) 一个sql请求

3) 一个 websocket 写

问题是对于给定的用户,服务器在第 2 步被“锁定”,直到这一步和第 3 步完成后才能读取。因此,这 100 个请求被依次解决。这对我的用例来说太慢了。

我已经读过 boost beast 无法进行非阻塞读/写。但是,我现在要做的是在协程中执行 async_read 和 async_write。

void ServerCoro::accept(websocket::stream<beast::tcp_stream> &ws) {
    beast::error_code ec;

    ws.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));

    ws.set_option(websocket::stream_base::decorator([](websocket::response_type &res) {
                res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-Server-coro");
            }));

    ws.async_accept(yield[ec]);
    if (ec) return fail(ec, "accept");

    while (!_bStop) {
        beast::flat_buffer buffer;
        ws.async_read(buffer, yield[ec]);

        if (ec == websocket::error::closed) {
            std::cout << "=> get closed" << std::endl;
            return;
        }

        if (ec) return fail(ec, "read");

        auto buffer_str = new std::string(boost::beast::buffers_to_string(buffer.cdata()));
        net::post([&, buffer_str] {

            // sql async request such as :
            // while (status == (mysql_real_query_nonblocking(this->con, sqlRequest.c_str(), sqlRequest.size()))) {
            //    ioc.poll_one(ec);
            // }
            // more sql ...

            ws.async_write(net::buffer(worker->getResponse()), yield[ec]); // this line is throwing void boost::coroutines::detail::pull_coroutine_impl<void>::pull(): Assertion `! is_running()' failed.
            if (ec) return fail(ec, "write");

        });
    }
}

问题是带有 async_write 的行抛出错误:

void boost::coroutines::detail::pull_coroutine_impl::pull(): 断言`! is_running()' 失败。

如果将此行替换为sync_write,它可以工作,但对于给定用户,服务器保持顺序。 我试图在单线程服务器上执行此代码。我还尝试对 async_read 和 async_write 使用相同的链。仍然有断言错误。

这样的服务器对于 websocket 的 boost beast 是不可能的吗? 谢谢。

【问题讨论】:

  • 您是否尝试过将 SQL 工作分派到单独的线程池,然后将结果发送回 Web 套接字连接的链?这应该允许您同时执行 SQL 操作。您将需要实现一个传出消息队列,代码可以在“websocket chat”Beast 示例程序中找到。
  • 它有效,而且效果很好。我使用了“websocket chat”和“async server”的例子。我已将 sql 代码放在没有链的 net::post 中,并且 websocket 在链上写入/读取(带有 async_write 的队列)。感谢 Vinnie Falco 的回复以及您在 boost beast 上所做的工作。

标签: c++ boost-asio boost-beast


【解决方案1】:

根据 Vinnie Falco 的建议,我以“websocket chat”和“async server”为例重写了代码。这是代码的最终工作结果:

void Session::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
    boost::ignore_unused(bytes_transferred);

    if(ec == websocket::error::closed) return;  // This indicates that the Session was closed
    if(ec) return fail(ec, "read");

    net::post([&, that = shared_from_this(), ss = std::make_shared<std::string const>(std::move(boost::beast::buffers_to_string(_buffer.cdata())))] {
        /* Sql things that call ioc.poll_one(ec) HERE, for me the sql response go inside worker.getResponse() used below */

        net::dispatch(_wsStrand, [&, that = shared_from_this(), sss = std::make_shared < std::string const>(worker.getResponse())] {
            async_write(sss);
        });
    });
    _buffer.consume(_buffer.size()); // we remove from the buffer what we just read
    do_read(); // go for another read
}

void Session::async_write(const std::shared_ptr<std::string const> &message) {
    _writeMessages.push_back(message);

    if (_writeMessages.size() > 1) {
        BOOST_LOG_TRIVIAL(warning) << "WRITE IS LOCKED";
    } else {
        _ws.text(_ws.got_text());
            _ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
                    &Session::on_write, this)));
    }
}

void Session::on_write(beast::error_code ec, std::size_t)
{
    // Handle the error, if any
    if(ec) return fail(ec, "write");

    // Remove the string from the queue
    _writeMessages.erase(_writeMessages.begin());

    // Send the next message if any
    if(!_writeMessages.empty())
        _ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
                        &Session::on_write, this)));
}

谢谢。

【讨论】:

  • 这看起来很棒,尽管它仍然可以改进。由于您使用的是 Boost 1.70,因此您可以将链放在 NextLayer 对象上,并在构建 websocket 时对其进行一次初始化。然后,所有异步 websocket 操作将自动使用 strand,而无需在调用站点使用 bind_executor。文档中对此进行了介绍。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-10-06
  • 2018-11-28
  • 1970-01-01
  • 2020-09-02
  • 2018-08-09
  • 1970-01-01
相关资源
最近更新 更多