【问题标题】:Example of dynamic thread pool in boost::asioboost::asio 中的动态线程池示例
【发布时间】:2012-06-20 13:56:40
【问题描述】:

我将使用单个 io_service ( HTTP Server 3 example ) 实现带有线程池的 boost::asio 服务器。 io_service 将绑定到 unix 域套接字并将请求从该套接字上的连接传递到不同的线程。为了减少资源消耗,我想让线程池动态化。

这是一个概念。首先创建一个线程。当请求到达并且服务器发现池中没有空闲线程时,它会创建一个新线程并将请求传递给它。服务器最多可以创建某个最大数量的线程。理想情况下,它应该具有暂停空闲一段时间的线程的功能。

有人做过类似的东西吗?或者也许有人有一个相关的例子?

至于我,我想我应该以某种方式覆盖 io_service.dispatch 来实现这一点。

【问题讨论】:

    标签: multithreading boost boost-asio


    【解决方案1】:

    最初的方法可能存在一些挑战:

    • boost::asio::io_service 并非旨在派生或重新实现。请注意缺少虚函数。
    • 如果您的线程库不提供查询线程状态的功能,则需要单独管理状态信息。

    另一种解决方案是将工作发布到io_service,然后检查它在io_service 中的停留时间。如果它准备好运行和实际运行之间的时间差超过某个阈值,那么这表明队列中的作业比服务队列的线程多。这样做的一个主要好处是动态线程池增长逻辑与其他逻辑分离。

    这是一个使用deadline_timer 完成此操作的示例。

    • deadline_timer 设置为从现在起几秒后3 过期。
    • 异步等待deadline_timer。处理程序将在设置 deadline_timer 后的 3 秒内准备好运行。
    • 在异步处理程序中,检查当前时间相对于定时器设置为过期的时间。如果大于2秒,那么io_service队列正在备份,所以在线程池中添加一个线程。

    例子:

    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/thread.hpp>
    #include <iostream>
    
    class thread_pool_checker
      : private boost::noncopyable
    {
    public:
    
      thread_pool_checker( boost::asio::io_service& io_service,
                           boost::thread_group& threads,
                           unsigned int max_threads,
                           long threshold_seconds,
                           long periodic_seconds )
        : io_service_( io_service ),
          timer_( io_service ),
          threads_( threads ),
          max_threads_( max_threads ),
          threshold_seconds_( threshold_seconds ),
          periodic_seconds_( periodic_seconds )
        {
          schedule_check();
        }
    
    private:
    
      void schedule_check();
      void on_check( const boost::system::error_code& error );
    
    private:
    
      boost::asio::io_service&    io_service_;
      boost::asio::deadline_timer timer_;
      boost::thread_group&        threads_;
      unsigned int                max_threads_;
      long                        threshold_seconds_;
      long                        periodic_seconds_;
    };
    
    void thread_pool_checker::schedule_check()
    {
      // Thread pool is already at max size.
      if ( max_threads_ <= threads_.size() )
      {
        std::cout << "Thread pool has reached its max.  Example will shutdown."
                  << std::endl;
        io_service_.stop();
        return;
      }
    
      // Schedule check to see if pool needs to increase.
      std::cout << "Will check if pool needs to increase in " 
                << periodic_seconds_ << " seconds." << std::endl;
      timer_.expires_from_now( boost::posix_time::seconds( periodic_seconds_ ) );
      timer_.async_wait( 
        boost::bind( &thread_pool_checker::on_check, this,
                     boost::asio::placeholders::error ) );
    }
    
    void thread_pool_checker::on_check( const boost::system::error_code& error )
    {
      // On error, return early.
      if ( error ) return;
    
      // Check how long this job was waiting in the service queue.  This
      // returns the expiration time relative to now.  Thus, if it expired
      // 7 seconds ago, then the delta time is -7 seconds.
      boost::posix_time::time_duration delta = timer_.expires_from_now();
      long wait_in_seconds = -delta.seconds();
    
      // If the time delta is greater than the threshold, then the job
      // remained in the service queue for too long, so increase the
      // thread pool.
      std::cout << "Job job sat in queue for " 
                << wait_in_seconds << " seconds." << std::endl;
      if ( threshold_seconds_ < wait_in_seconds )
      {
        std::cout << "Increasing thread pool." << std::endl;
        threads_.create_thread(
          boost::bind( &boost::asio::io_service::run,
                       &io_service_ ) );
      }
    
      // Otherwise, schedule another pool check.
      schedule_check();
    }
    
    // Busy work functions.
    void busy_work( boost::asio::io_service&,
                    unsigned int );
    
    void add_busy_work( boost::asio::io_service& io_service,
                        unsigned int count )
    {
      io_service.post(
        boost::bind( busy_work,
                     boost::ref( io_service ),
                     count ) );
    }
    
    void busy_work( boost::asio::io_service& io_service,
                    unsigned int count )
    {
      boost::this_thread::sleep( boost::posix_time::seconds( 5 ) );
    
      count += 1;
    
      // When the count is 3, spawn additional busy work.
      if ( 3 == count )
      {
        add_busy_work( io_service, 0 );
      }
      add_busy_work( io_service, count );
    }
    
    int main()
    {
      using boost::asio::ip::tcp;
    
      // Create io service.
      boost::asio::io_service io_service;
    
      // Add some busy work to the service.
      add_busy_work( io_service, 0 );
    
      // Create thread group and thread_pool_checker.
      boost::thread_group threads;
      thread_pool_checker checker( io_service, threads,
                                   3,   // Max pool size.
                                   2,   // Create thread if job waits for 2 sec.
                                   3 ); // Check if pool needs to grow every 3 sec.
    
      // Start running the io service.
      io_service.run();
    
      threads.join_all();
    
      return 0;
    }
    

    输出:

    将检查池是否需要在 3 秒内增加。
    作业作业排队等待 7 秒。
    增加线程池。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 4 秒。
    增加线程池。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 0 秒。
    将检查池是否需要在 3 秒内增加。
    作业作业在队列中等待了 3 秒。
    增加线程池。
    线程池已达到最大值。示例将关闭。

    【讨论】:

    • 如果我理解正确,busy_work 任务可能会在队列中等待几秒钟以及池检查器,即使尚未达到最大线程数,因为没有提前创建新线程。这使得这个原则几乎不可用,因为动态特性不应该如此降低性能。与静态池所需的时间相比,它应该使任务执行时间更长,只是创建新线程所需的时间。无论如何,谢谢。
    • @user484936:你的理解是正确的。池增长发生在检测到退化之后;它是一种更简单的池化方法,不应“降低性能”。如果您想在知道需要时分配线程,则需要管理线程状态,从而为所有线程引入开销,并且可能需要将状态逻辑分散在整个代码中。如果您希望分配线程按照您的预测它们将被需要,然后有一个专用线程将作业发布到服务中,然后执行定时等待响应。
    • 我想知道在只执行一个长时间运行的任务并且当我们的计时器触发时我们不必要地向池中添加一个线程的情况下会发生什么。如果当时实际上没有更多事件要处理,那么这种方法对我来说似乎效率低下。如果我错了,请纠正我。
    • 我在 8 年后发现了那个帖子。 @russoue,使用 asio,您必须编写代码才能在一个线程中没有“长时间运行的任务”,换句话说,您必须使用 post 来处理它。
    • @TannerSansbury,增加线程池的好主意,但下一步应该是在低工作负载时减少池。恕我直言,这不容易......
    猜你喜欢
    • 2011-12-18
    • 2012-08-26
    • 2011-09-03
    • 2017-09-17
    • 1970-01-01
    • 2014-01-31
    • 1970-01-01
    • 2018-12-03
    • 1970-01-01
    相关资源
    最近更新 更多