【问题标题】:Triggering writes with Boost::asio使用 Boost::asio 触发写入
【发布时间】:2011-02-24 21:48:01
【问题描述】:

我有一些软件想制作一个 TCP 客户端。我不知道这是否是最好的架构,但在我的软件中,我生成了一个用于网络 I/O 的线程。如果有更好的架构,我会很感激一些指针和建议。

两个线程都引用了 boost::asio::io_service 对象和封装了套接字对象的 Session 对象。 sesson对象大致如下:

  class Session
  {
  public:

  Session(
    boost::asio::io_service & io_service,
    std::string const & ip_address,
    std::string const & port)
  : io_service_(io_service),
    resolver_(io_service),
    socket_(io_service),
    ip_address_(ip_address),
    port_(port),
  {}

  virtual void start();
  virtual ~Session();
  virtual void stop();
  void write(std::string const & msg);

  void handle_resolve(
    const boost::system::error_code & error,
    boost::asio::ip::tcp::resolver::iterator endpoint_itr);

  void handle_connect(
    const boost::system::error_code & error,
    boost::asio::ip::tcp::resolver::iterator endpoint_itr);

  void handle_close();
  void handle_write(const boost::system::error_code & error);

private:
  boost::asio::io_service & io_service_;
  boost::asio::ip::tcp::resolver resolver_;
  boost::asio::ip::tcp::socket socket_;
  std::string ip_address_;
  std::string port_;
};

在 I/O 线程运行循环中,会调用连接到服务器的会话对象的 start() 方法。 (这行得通,顺便说一句)。然后,线程处于循环中,调用 I/O 服务对象 [io_service_.run()] 上的 run() 方法来触发事件。

主线程要发送数据时调用会话的write()方法,会话对象调用boost::async_write将要写入的数据,然后是会话对象成员的回调方法(句柄写)。

虽然我有连接到服务器的 I/O 线程,但我无法触发 handle_write 方法。我已经验证了主线程正在调用会话对象并在套接字上执行 async_write() 。只是永远不会触发回调。我也没有在服务器端或使用 tcpdump 的网络上看到任何数据。

知道我的问题可能出在哪里吗?有没有更好的方法来组织架构?最重要的是,我不想阻塞主线程执行 I/O。

以下是从主线程生成 io 线程的代码(为间距道歉):

    boost::asio::io_service io_service;
    boost::shared_ptr<Session> session_ptr;
    boost::thread io_thread;
    ....
    session_ptr.reset(
      new Session::Session(
                io_service,
                std::string("127.0.0.1"),
                std::string("17001")));

    // spawn new thread for the network I/O endpoint
    io_thread = boost::thread(
      boost::bind(
        &Session::start,
        session_ptr_.get()));

start()方法的代码如下:

    void Session::start()
    {
      typedef boost::asio::ip::tcp tcp;

      tcp::resolver::query query(
          tcp::v4(),
          ip_address_,
          port_);  

      resolver_.async_resolve(
          query,
          boost::bind(
              &Session::handle_resolve,
              this,
              boost::asio::placeholders::error,
              boost::asio::placeholders::iterator));

      while(1){ // improve this later
        io_service_.run();
      }
    }

解析器的回调:

    void Session::handle_resolve(
        const boost::system::error_code & error,
        boost::asio::ip::tcp::resolver::iterator endpoint_itr)
    {
      if (!error)
      {
        boost::asio::ip::tcp::endpoint endpoint = *endpoint_itr;
        socket_.async_connect(
            endpoint,
            boost::bind(
              &Session::handle_connect,
              this,
              boost::asio::placeholders::error,
              ++endpoint_itr));
      }
      else
      {
        std::cerr << "Failed to resolve\n";
        std::cerr << "Error: " << error.message() << std::endl;
      }
    }

连接回调:

    void Session::handle_connect(
        const boost::system::error_code & error,
        boost::asio::ip::tcp::resolver::iterator endpoint_itr)
    {
      typedef boost::asio::ip::tcp tcp;

      if (!error)
      {
        std::cerr << "Connected to the server!\n";
      }
      else if (endpoint_itr != tcp::resolver::iterator())
      {
        socket_.close();
        socket_.async_connect(
            *endpoint_itr,
            boost::bind(
              &Session::handle_connect,
              this,
              boost::asio::placeholders::error,
              ++endpoint_itr));
      }
      else
      {
        std::cerr << "Failed to connect\n";
      }

    }

主线程可以调用的 write() 方法来发送异步写入。

    void Session::write(
        std::string const & msg)
    {
      std::cout << "Write: " << msg << std::endl;
      boost::asio::async_write(
          socket_,
          boost::asio::buffer(
              msg.c_str(),
              msg.length()),
          boost::bind(
              &Session::handle_write,
              this,
              boost::asio::placeholders::error));     
    }

最后,写完成回调:

    void Session::handle_write(
        const boost::system::error_code & error)
    {
      if (error)
      {
         std::cout << "Write complete with errors !!!\n";
      }
      else
      {
         std::cout << "Write complete with no errors\n";
      }        
    }

【问题讨论】:

  • 我们可以看看您的 start()、write()、handle_connect() 和 handle_write() 函数的代码吗?
  • 您的示例不完整,请发布更多代码。以您描述的方式使用 asio 是很常见的,一个主线程和一个调用 io_service 事件循环的第二个线程。
  • 按要求添加了代码。我基本上从 HTTP 客户端示例中获取了大部分内容。
  • 您更新的问题仍然不完整,我们需要查看Session 类定义。
  • 很抱歉,希望这将提供所有需要的信息。谢谢。

标签: c++ boost asynchronous boost-asio


【解决方案1】:

看起来您的 io 服务在连接后将耗尽工作,之后您只需再次调用 io_service::run 吗?看起来 run 在 while 循环中被调用,但是我在任何地方都看不到重置的调用。您需要调用 io::service::reset 在再次调用相同的 io_service 之前运行。

在结构上,最好将work添加到io_service中,这样你就不需要在循环中调用它,一旦你调用io_service::stop,运行就会退出。

【讨论】:

  • +1 表示io_service 将在连接处理程序后耗尽工作。
  • 啊,我明白了,我没有意识到我需要调用重置。工作进展如何?当我的客户端空闲并等待事情发生时(例如,等待来自主线程的事件或等待来自服务器的命令,我是否应该发布读取以保持 io_service 有效?
  • 通过让 io_service 工作,您不需要进行任何未完成的异步操作,例如读取。 run 方法只会在你调用 stop 方法后返回。
  • 我想我明白了。我会发布一个读取,写入一些数据,当触发读取回调时,我可以发布另一个读取。这会让 io_service 忙于工作,对吧?
  • @Dr 是的,你明白了。这就是异步设计的优雅之处。处理程序将额外的工作排队,保持io_service 循环处于活动状态。
【解决方案2】:

这部分代码

boost::asio::io_service io_service;
boost::shared_ptr<Session> session_ptr;
boost::thread io_thread;
....
session_ptr.reset(
  new Session::Session(
            io_service,
            std::string("127.0.0.1"),
            std::string("17001")));

// spawn new thread for the network I/O endpoint
io_thread = boost::thread(
  boost::bind(
    &Session::start,
    session_ptr_.get()));

对我来说是一个危险信号。您的 io_service 对象可能超出范围并导致奇怪的行为。 io_service 是不可复制的,因此将它作为非常量引用传递给您的 Session 可能不是您希望实现的目标。

samm@macmini ~> grep -C 2 noncopyable  /usr/include/boost/asio/io_service.hpp 
#include <boost/asio/detail/epoll_reactor_fwd.hpp>
#include <boost/asio/detail/kqueue_reactor_fwd.hpp>
#include <boost/asio/detail/noncopyable.hpp>
#include <boost/asio/detail/select_reactor_fwd.hpp>
#include <boost/asio/detail/service_registry_fwd.hpp>
--
 */
class io_service
  : private noncopyable
{
private:
--
/// Class used to uniquely identify a service.
class io_service::id
  : private noncopyable
{
public:
--
/// Base class for all io_service services.
class io_service::service
  : private noncopyable
{
public:

如果您的代码基于HTTP client 示例,您应该注意io_service 始终在main() 的范围内。作为 Ralf pointed out,您的 io_service 也可能在连接处理程序之后没有工作要做,这就是为什么您在循环内调用 run() 的原因

while(1){ // improve this later
  io_service_.run();
}

再次注意,HTTP client 示例没有这样做。您需要在连接处理程序中启动另一个异步操作,读取或写入取决于您的应用程序的需要。

【讨论】:

  • +1,很好发现,没有看到 io_service 超出范围(除非 OP 正在加入线程)
  • 那么 I/O 服务是否应该成为 Session 对象的一部分?
  • @Ralf,是的,主线程正在加入线程。它本身就是一个基于事件的线程。
猜你喜欢
  • 1970-01-01
  • 2013-05-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-02
相关资源
最近更新 更多