【问题标题】:Working with boost::asio::streambuf使用 boost::asio::streambuf
【发布时间】:2015-02-12 13:07:45
【问题描述】:

寻找boost::asio(和自己boost)决定编写异步服务器。为了存储传入的数据,我使用 boost::asio::streambuf。 在这里我有一个问题。当我从客户端收到第二条消息时,我看到缓冲区中包含来自先前消息的数据。 虽然我在输入缓冲区调用 Consume 方法。怎么了?

class tcp_connection
// Using shared_ptr and enable_shared_from_this 
// because we want to keep the tcp_connection object alive 
// as long as there is an operation that refers to it.
: public boost::enable_shared_from_this<tcp_connection>
{
...

boost::asio::streambuf receive_buffer;

boost::asio::io_service::strand strand;
}

...

void tcp_connection::receive()
{
// Read the response status line. The response_ streambuf will
// automatically grow to accommodate the entire line. The growth may be
// limited by passing a maximum size to the streambuf constructor.
boost::asio::async_read_until(m_socket, receive_buffer, "\r\n",
    strand.wrap(boost::bind(&tcp_connection::handle_receive, shared_from_this()/*this*/,
    boost::asio::placeholders::error,
    boost::asio::placeholders::bytes_transferred)));

}


void tcp_connection::handle_receive(const boost::system::error_code& error, 
std::size_t bytes_transferred)
{

if (!error)
{
    // process the data

    /*  boost::asio::async_read_until remarks

    After a successful async_read_until operation, 
    the streambuf may contain additional data beyond the delimiter.
    An application will typically leave that data in the streambuf for a
    subsequent async_read_until operation to examine.
    */

    /* didn't work      
    std::istream is(&receive_buffer);
    std::string line;
    std::getline(is, line); 
    */


    // clean up incomming buffer but it didn't work 
    receive_buffer.consume(bytes_transferred);  

    receive(); 

}
else if (error != boost::asio::error::operation_aborted)
{
    std::cout << "Client Disconnected\n";

    m_connection_manager.remove(shared_from_this());
}
}

【问题讨论】:

    标签: c++ boost-asio streambuf


    【解决方案1】:

    使用std::istream 并从中读取数据(例如通过std::getline())或显式调用boost::asio::streambuf::consume(n),将从输入序列中删除数据。
    如果应用程序正在执行这些和后续read_until() 操作导致receive_buffer 的输入序列中的重复数据,那么重复数据很可能来自远程对等点。如果远程对等方正在写入套接字并直接使用流缓冲区的输入序列,则远程对等方需要在每次成功的写入操作后显式调用consume()


    如文档中所述,成功的read_until() 操作可能包含超出分隔符的其他数据,包括其他分隔符。例如,如果将"a@b@" 写入套接字,则使用'@' 作为分隔符的read_until() 操作可以读取"a@b@" 并将其提交到streambuf 的输入序列。但是,该操作将指示传输的字节数最多为第一个分隔符(包括第一个分隔符)。因此,bytes_transferred 将是 2streambuf.size() 将是 4。在2 字节被消耗后,streambuf 的输入序列将包含"b@",随后对read_until() 的调用将立即返回,因为streambuf 已经包含分隔符。

    这里是一个完整的例子demonstratingstreambuf读写的用法,以及输入序列是如何被消费的:

    #include <iostream>
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    
    // This example is not interested in the handlers, so provide a noop function
    // that will be passed to bind to meet the handler concept requirements.
    void noop() {}
    
    std::string make_string(boost::asio::streambuf& streambuf)
    {
     return {buffers_begin(streambuf.data()), 
             buffers_end(streambuf.data())};
    }
    
    int main()
    {
      using boost::asio::ip::tcp;
      boost::asio::io_service io_service;
    
      // Create all I/O objects.
      tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
      tcp::socket server_socket(io_service);
      tcp::socket client_socket(io_service);
    
      // Connect client and server sockets.
      acceptor.async_accept(server_socket, boost::bind(&noop));
      client_socket.async_connect(acceptor.local_endpoint(), boost::bind(&noop));
      io_service.run();
    
      // Write to server.
      boost::asio::streambuf write_buffer;
      std::ostream output(&write_buffer);
      output << "a@"
                "b@";
      write(server_socket, write_buffer.data());
      std::cout << "Wrote: " << make_string(write_buffer) << std::endl;
      assert(write_buffer.size() == 4);  // Data not consumed.
    
      // Read from the client.
      boost::asio::streambuf read_buffer;
    
      // Demonstrate consuming via istream.
      {
        std::cout << "Read" << std::endl;
        auto bytes_transferred = read_until(client_socket, read_buffer, '@');
        // Verify that the entire write_buffer (data pass the first delimiter) was
        // read into read_buffer.
        auto initial_size = read_buffer.size();
        assert(initial_size == write_buffer.size());
    
        // Read from the streambuf.
        std::cout << "Read buffer contains: " << make_string(read_buffer)
                  << std::endl;
        std::istream input(&read_buffer);
        std::string line;
        getline(input, line, '@'); // Consumes from the streambuf.
        assert("a" == line); // Note getline discards delimiter.
        std::cout << "Read consumed: " << line << "@" << std::endl;
        assert(read_buffer.size() == initial_size - bytes_transferred);
      }
    
      // Write an additional message to the server, but only consume 'a@'
      // from write buffer.  The buffer will contain 'b@c@'.
      write_buffer.consume(2);
      std::cout << "Consumed write buffer, it now contains: " <<
                    make_string(write_buffer) << std::endl;
      assert(write_buffer.size() == 2);
      output << "c@";
      assert(write_buffer.size() == 4);
      write(server_socket, write_buffer.data());
      std::cout << "Wrote: " << make_string(write_buffer) << std::endl;
    
      // Demonstrate explicitly consuming via the streambuf.
      {
        std::cout << "Read" << std::endl;
        auto initial_size = read_buffer.size();
        auto bytes_transferred = read_until(client_socket, read_buffer, '@');
        // Verify that the read operation did not attempt to read data from
        // the socket, as the streambuf already contained the delimiter.
        assert(initial_size == read_buffer.size());
    
        // Read from the streambuf.
        std::cout << "Read buffer contains: " << make_string(read_buffer)
                  << std::endl;
        std::string line(
            boost::asio::buffers_begin(read_buffer.data()),
            boost::asio::buffers_begin(read_buffer.data()) + bytes_transferred);
        assert("b@" == line);
        assert(read_buffer.size() == initial_size); // Nothing consumed.
        read_buffer.consume(bytes_transferred); // Explicitly consume.
        std::cout << "Read consumed: " << line << std::endl;
        assert(read_buffer.size() == 0);
      }
    
      // Read again.
      {
        std::cout << "Read" << std::endl;
        read_until(client_socket, read_buffer, '@');
    
        // Read from the streambuf.
        std::cout << "Read buffer contains: " << make_string(read_buffer)
                  << std::endl;
        std::istream input(&read_buffer);
        std::string line;
        getline(input, line, '@'); // Consumes from the streambuf.
        assert("b" == line); // Note "b" is expected and not "c".
        std::cout << "Read consumed: " << line << "@" << std::endl;
        std::cout << "Read buffer contains: " << make_string(read_buffer)
                  << std::endl;
      }
    }
    

    输出:

    Wrote: a@b@
    Read
    Read buffer contains: a@b@
    Read consumed: a@
    Consumed write buffer, it now contains: b@
    Wrote: b@c@
    Read
    Read buffer contains: b@
    Read consumed: b@
    Read
    Read buffer contains: b@c@
    Read consumed: b@
    Read buffer contains: c@
    

    【讨论】:

    • 谢谢,这是使用 streambuf 的一个很好的例子。我找到了为什么我在发送后看到以前消息中的数据。在客户端,函数 Send() 使用全局缓冲区 streambuf(在我的示例中为 send_buffer)。我如何做到每次调用 Send() 都使用自己的 streambuf 对象?像 boost::make_shared 之类的东西?这是正确的做法吗?你有使用 boost::asio 的服务器和客户端的好例子吗?这就是 Boost 文档中不适合我的内容,使用了恒定长度的缓冲区,我想看看使用 streambuf 的示例。
    • 很棒,很有帮助的答案。我希望 Boost 的人会让你编写他们所有的文档! ;-)
    • 你是如何在没有提交的情况下访问输入序列的 (commit())? data() documentation 表示data() 返回输入缓冲区列表,根据commit() documentation,它实际上将字符从输出序列复制到输入序列。
    • @Cengiz 当使用接受 streambuf 的 Asio 操作(不是表示其序列的缓冲区)时,或使用使用 streambuf 的流对象时,例如 std::ostream,底层输入和输出序列将得到妥善管理。此answer 可能会提供更多详细信息。此外,commit() 的文档提到 移动,而不是复制数据。当前实现使用单个连续数组,其中commit() 将输出序列开始指针和输入序列结束指针都提前n 个字符。
    • @Tanner,感谢您的快速回答。由于问题有点老了,我希望再等一会儿。
    猜你喜欢
    • 2017-09-02
    • 1970-01-01
    • 2011-08-06
    • 1970-01-01
    • 1970-01-01
    • 2012-10-26
    • 2016-09-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多