【发布时间】:2020-09-02 13:13:16
【问题描述】:
我正在编写一个 c++ 项目来将客户端 websocket 请求转发到服务器,并在每个客户端处理多个客户端时立即将它们的消息返回给对方,但是对于将每个客户端连接到服务器 async_read 保持在第一个端点上,因此然而,下一个端点已连接但被阻止读取,顺便说一句,每件事都是异步的。
// my server
class websocket_session : public std::enable_shared_from_this<websocket_session>
{
std::string response = "";
std::shared_ptr<websocket::stream<beast::tcp_stream>> ws_;
boost::beast::multi_buffer buffer_;
boost::beast::multi_buffer buffer_client;
std::string meth = "", endpoint = "", message = "", host = "", port = "";
shared_ptr<SocketClient> javaClient;
net::io_context ioc_client;
public:
// Take ownership of the socket
explicit websocket_session(tcp::socket &&socket)
{
ws_ = make_shared<websocket::stream<beast::tcp_stream>>(std::move(socket));
}
// Start the asynchronous accept operation
template <class Body, class Allocator>
void do_accept(http::request<Body, http::basic_fields<Allocator>> req)
{
stringstream temprequest;
temprequest << req.target();
endpoint = temprequest.str();
temprequest.str(std::string());
temprequest << req.method();
meth = temprequest.str();
std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";
//Get IP and port in case endpoint is eligible
if (EndPointSelection::getWebsocketHost(endpoint, meth, req.version(), host, port) == false)
{
std::cerr << "\n\t Websocket is NOT Authorized!!\n";
return;
}
// Set suggested timeout settings for the websocket
ws_->set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_->set_option(websocket::stream_base::decorator(
[](websocket::response_type &res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" advanced-server");
}));
// Accept the websocket handshake
ws_->async_accept(req, beast::bind_front_handler(&websocket_session::on_accept, shared_from_this()));
}
private:
void on_accept(beast::error_code ec)
{
if (ec)
fail(ec, "accept");
std::cout << "\n\t websocket_session::" << __FUNCTION__ << " Endpoint: " << endpoint << "\n";
javaClient = std::make_shared<SocketClient>(ioc_client);
javaClient->run(host.c_str(), port.c_str(), endpoint.c_str());
javaClient->setWebsocket(ws_);
ioc_client.run();
do_read();
}
void do_read()
{
std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";
// Read message from client and keep into our buffer
ws_->async_read(buffer_, beast::bind_front_handler(&websocket_session::on_read, shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";
// This indicates that the websocket_session was closed
if (ec == websocket::error::closed)
return;
if (ec)
fail(ec, "read");
ws_->text(ws_->got_text());
if (ws_->got_text())
{
std::cout << "\n\t Received Message from Client: " << boost::beast::buffers_to_string(buffer_.data()) << "\n";
sleep(1);
}
ioc_client.stop();
//ws_->async_read(buffer_, beast::bind_front_handler(&websocket_session::on_read, shared_from_this()));
}
void on_write(beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
std::cout << "\n\t websocket_session::" << __FUNCTION__ << "\n";
if (ec)
fail(ec, "write");
// Clear the buffer
buffer_client.consume(buffer_client.size());
// Do another read
do_read();
}
};
//my client
void SocketClient::run(char const *host, char const *port, char const *endpoint)
{
// Save these for later
host_ = host;
endpoint_ = endpoint;
std::cout << "\n run Endpoint:" << endpoint_ << "\n";
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
// Look up the domain name
resolver_.async_resolve(host, port, beast::bind_front_handler(&SocketClient::on_resolve, shared_from_this()));
}
void SocketClient::setWebsocket(std::shared_ptr<websocket::stream<beast::tcp_stream>> ws_cl)
{
ws_client = ws_cl;
};
void SocketClient::getBuffer(boost::beast::multi_buffer &buffer_client)
{
buffer_client = buffer_;
}
void SocketClient::writeBuffer(boost::beast::multi_buffer buffer_client)
{
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
ws_.write(buffer_client.data());
// ws_.async_write(buffer_client.data(), beast::bind_front_handler(&SocketClient::on_write, shared_from_this()));
//ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}
void SocketClient::on_resolve(beast::error_code ec, tcp::resolver::results_type results)
{
if (ec)
fail(ec, "SocketClient::on_resolve Error: ");
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
// Set the timeout for the operation
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(results, beast::bind_front_handler(&SocketClient::on_connect, shared_from_this()));
}
void SocketClient::on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep)
{
if (ec)
fail(ec, "connect");
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client));
// Set a decorator to change the User-Agent of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::request_type &req) {
req.set(http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-async");
}));
// Update the host_ string. This will provide the value of the
// Host HTTP header during the WebSocket handshake.
// See https://tools.ietf.org/html/rfc7230#section-5.4
host_ += ':' + std::to_string(ep.port());
// Perform the websocket handshake
ws_.async_handshake(host_, endpoint_,
beast::bind_front_handler(
&SocketClient::on_handshake,
shared_from_this()));
}
void SocketClient::on_handshake(beast::error_code ec)
{
if (ec)
fail(ec, "SocketClient::on_handshake Error: ");
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}
void SocketClient::on_write(beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
buffer_.consume(buffer_.size());
if (ec)
fail(ec, "SocketClient::on_write Error: ");
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
// Read a message from server and keep it into our buffer
ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}
void SocketClient::do_read()
{
ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}
void SocketClient::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
std::cout << "\nHi from SocketClient::" << __FUNCTION__ << "\n";
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
response_.append(boost::beast::buffers_to_string(buffer_.data())) ;
if (ec)
{
fail(ec, "SocketClient::on_read Error: ");
return;
}
if (response_.empty() == false)
{
std::cout << "\nServer: " << response_ << "\t Size: " << buffer_.size() << "\n";
ws_client->write(net::buffer(response_));
response_="";
}
buffer_.consume(buffer_.size());
sleep(1);
if (ws_client->got_text())
{
std::cout << "\n CLIENT Message\n ";
ws_.write(buffer_.data());
}
ws_.async_read(buffer_, beast::bind_front_handler(&SocketClient::on_read, shared_from_this()));
}
void SocketClient::close_connection()
{
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
// Close the WebSocket connection
ws_.async_close(websocket::close_code::normal, beast::bind_front_handler(&SocketClient::on_close, shared_from_this()));
};
void SocketClient::on_close(beast::error_code ec)
{
if (ec)
fail(ec, "SocketClient::on_close Error: ");
std::cout << "\nSocketClient::" << __FUNCTION__ << "\n";
// If we get here then the connection is closed gracefully
// The make_printable() function helps print a ConstBufferSequence
std::cout << beast::make_printable(buffer_.data()) << std::endl;
}
【问题讨论】:
标签: c++ multithreading asynchronous websocket boost-beast