【问题标题】:boost::asio::async_read_some run in parent threadboost::asio::async_read_some 在父线程中运行
【发布时间】:2013-01-26 23:53:09
【问题描述】:

我正在编写一个高效的套接字服务器。目的是良好的整体吞吐量。我使用主线程作为监听器。它async_accept 一个客户端并将套接字添加到队列中。有一个调度程序线程从队列中拾取一个准备好从中读取的套接字,并添加到其中一个工作线程的队列中。我保留了一个工作线程池。工作线程将执行实际的读/写操作。

我在听众中使用async_accept。为了找出可以读取的套接字,我在调度程序中使用了 async_read_some。这个想法可行,但有一个问题。我的io_service.run()是在listener中调用的,所以dispatcher中async_read_some的handler,其实是在listener线程中运行的。

这是我的代码:

using boost::asio::ip::tcp;
using namespace std;

std::queue<std::shared_ptr<tcp::socket>> q_sock;
boost::mutex m_log1;
boost::condition_variable m_cond1;
boost::mutex::scoped_lock m_lock1 = boost::mutex::scoped_lock(m_log1);
sem_t _sem_sock;

enum { max_length1 = 1024 };
char data_1[max_length1];

void handle_read1(std::shared_ptr<tcp::socket> sock, const boost::system::error_code& error,
  size_t bytes_transferred)
{
    printf("handle_read1 : error : %s : %d, thread id is: %ld, pid : %d \n", error.category().name(), error.value(), (long int)syscall(SYS_gettid), getpid());

    boost::asio::write(*(sock.get()), boost::asio::buffer(data_1, bytes_transferred));
}


void sock_dispatch() {
    int v_size = 0;
    std::shared_ptr<tcp::socket> curr_sock;

    printf("sock_dispatch started. The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

    while(1) {

        while(1) {
            sem_wait(&_sem_sock);
            v_size = q_sock.size();
            sem_post(&_sem_sock);

            if(v_size <= 0)
                m_cond1.timed_wait(m_lock1,boost::posix_time::milliseconds(5000));
            else
                break;
        }

        sem_wait(&_sem_sock);
        curr_sock = q_sock.front();
        q_sock.pop();
        sem_post(&_sem_sock);

        curr_sock->async_read_some(boost::asio::buffer(data_1, max_length1),
        boost::bind(handle_read1, curr_sock,
          boost::asio::placeholders::error,
          boost::asio::placeholders::bytes_transferred));
    }

}

class session
{
    public:
      session(boost::asio::io_service& io_service)
        : sockptr(new tcp::socket(io_service)) {}

      void start()
      {
            printf("START NEW SESSION   The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

            sem_wait(&_sem_sock);

            q_sock.push(sockptr);

            sem_post(&_sem_sock);

            m_cond1.notify_all();
      }

      std::shared_ptr<tcp::socket> sockptr;
};

class server
{
    public:
      server(boost::asio::io_service& io_service, short port)
        : io_service_(io_service),
          acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
      {
        session* new_session = new session(io_service_);
        acceptor_.async_accept(*(new_session->sockptr.get()),
            boost::bind(&server::handle_accept, this, new_session,
              boost::asio::placeholders::error));

        printf("WAITING TO ACCEPT: The ID of this of this thread is: %ld, pid : %d \n", (long int)syscall(SYS_gettid), getpid());

      }

      void handle_accept(session* new_session,
          const boost::system::error_code& error)
      {
          new_session->start();
          new_session = new session(io_service_);
          acceptor_.async_accept(*(new_session->sockptr.get()),
              boost::bind(&server::handle_accept, this, new_session,
                boost::asio::placeholders::error));
      }

    private:
      boost::asio::io_service& io_service_;
      tcp::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
    sem_init(&_sem_sock, 0, 1);

    boost::asio::io_service io_service;

    using namespace std;
    server s(io_service, atoi(argv[1]));

    boost::thread t(boost::bind(sock_dispatch));

    io_service.run();

    return 0;
}

此代码是从 boost::asio 示例 http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/async_tcp_echo_server.cpp 修改而来的。并且客户端代码是http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/example/echo/blocking_tcp_echo_client.cpp

当客户端连接时,服务器的输出:

WAITING TO ACCEPT: The ID of this of this thread is: 3843, pid : 3843 
sock_dispatch started. The ID of this of this thread is: 3844, pid : 3843 
START NEW SESSION   The ID of this of this thread is: 3843, pid : 3843 
handle_read1 : error : system : 0, thread id is: 3843, pid : 3843

在这种情况下,调度程序线程 id 是 3944,但 handle_read1 在线程 3843 中运行。 理想情况下,handle_read1 应该在调度程序中运行,因此它不会阻塞在侦听器中的接受。

知道我应该怎么做才能实现这一目标吗?或者整个事情有更好的设计:)?

【问题讨论】:

    标签: c++ boost-asio asyncsocket


    【解决方案1】:

    如果您需要在特定线程中调用特定处理程序,请使用不同的io_service 对象。例如,acceptor 可以用io_service1 构造,而套接字可以用io_service2 构造。然后主线程可以执行io_service1.run(),而线程池中的线程执行io_service2.run()

    话虽如此,混合异步和同步功能可能相当棘手。在我从事的大多数异步程序中,很少需要将线程专用于特定的异步链。


    总的来说,我认为概念设计还不错,但我对实现有几点建议:

    • q_sock 消费者和生产者代码是高级和低级结构的混合体。条件变量的使用有点不习惯,它引出了一个问题,即为什么使用sem_t 代替boost::mutex 和锁定。例如以下消费者和生产者代码:

      // Consumer
      while(1)
      {
        sem_wait(&_sem_sock);
        v_size = q_sock.size();
        sem_post(&_sem_sock);
      
        if (v_size <= 0)
          m_cond1.timed_wait(m_lock1, boost::posix_time::milliseconds(5000));
        else
          break;
      }
      sem_wait(&_sem_sock);
      curr_sock = q_sock.front();
      q_sock.pop();
      sem_post(&_sem_sock);
      
      // Producer    
      sem_wait(&_sem_sock);
      q_sock.push(sockptr);
      sem_post(&_sem_sock);
      m_cond1.notify_all();
      

      可以在不使用 sem_t 的情况下重写,并且根据 Boost.Thread 的 condition_variable 文档更加惯用。考虑替代方案:

      // Consumer
      boost::unique_lock<boost::mutex> lock(m_log1);
      while (q_sock.empty())
      {
        m_cond1.wait(lock);
      }
      curr_sock = q_sock.front();
      q_sock.pop();
      lock.unlock();
      
      // Producer
      {
        boost::lock_guard<boost::mutex> lock(m_log1);
        q_sock.push(sockptr);
      }
      m_cond1.notify_all();
      
    • 尚不清楚session 提供了哪些功能。

      • 它似乎只是一种分配套接字并将其放入队列的方法。为什么不直接分配套接字并让调用者将其放入队列中?
      • session::sockptr 是通过智能指针管理的,但 session 不是。由于 session 没有通过智能指针进行管理,server::handle_accept 中会发生内存泄漏,因为 session 的句柄在重新分配中丢失。

      确定session 将提供什么功能,并围绕它设计界面。

      • 如果打算提供封装,那么非成员函数,例如handle_read1,可能需要成为成员函数。
      • 如果session 有自己的异步链,并且将自己提供给处理程序,则考虑使用enable_shared_from_this。 Boost.Asio tutorial 提供了一个示例用法,examples 中的一些也提供了示例。
    • 目前,async_read_some 并未指示哪个套接字已准备好被读取。在调用ReadHandler 时,数据已被读取。

      这是 Proactor 和 Reactor 之间的根本区别。如果您需要 Reactor 样式的操作,请使用 boost::asio::null_buffers。有关更多详细信息,请参阅this 文档。然而,每种方法都有其后果。因此,了解这些后果以便做出最佳决策至关重要。

    • 使用 Boost.Asio 通过高级构造提供事件多路分解,sock_dispatch 线程可能看起来不切实际。 session::start 成员函数可以在套接字上启动异步读取。这个微小的更改将消除对q_sock 的需要,以及示例代码中的所有同步构造。

    • 检查为什么必须使用同步写入。在回显客户端的情况下,如示例所示,通常可以通过控制异步链本身的流量来消除资源争用,从而使用异步写入。这允许每个连接都有自己的缓冲区,可用于读取和写入。

    • 不要预先优化。由于控制流的反向,异步编程本质上更难调试。尝试对吞吐量进行预优化只会加剧复杂性问题。程序运行后,执行吞吐量测试。如果结果不符合要求,则剖析确定瓶颈。根据我的经验,大多数具有高吞吐量的服务器在受 CPU 限制之前就会受到 I/O 限制。

    【讨论】:

    • 感谢 twsansbury,当我在线程中使用另一个 io_service obj 时它可以工作。而且我意识到我的侦听器线程不需要异步,我可以为此使用循环。我可以为我的调度程序使用 io_service,以便找出哪个套接字已准备好读取。\n 正如你所提到的,我不应该混合使用异步和同步,我也觉得这种设计有点可怕。首先,你认为我应该把监听器和调度器分开吗?如果我把它们放在一起,dispatch handler会停止acceptor,这会影响吞吐量,不是吗?
    • 除了 async_read_some 之外,还有其他方法可以找出准备就绪的套接字吗?这对我来说有点被操纵了。
    • @eltonsky:我已经更新了答案以提供一些总体建议,并希望回答您的其他问题。
    猜你喜欢
    • 2016-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多