【问题标题】:asio::io_service priority queue handling with multiple threadsasio::io_service 多线程处理优先级队列
【发布时间】:2019-01-30 03:54:47
【问题描述】:

我在多线程 C++ 代码中经常使用 asio::io_service。最近,由于在处理各种任务时缺乏优先级,我发现了我的代码中的一个瓶颈。我自然而然地遇到了这种提升example 以确保某些任务的优先级高于休息。但此示例仅适用于单线程应用程序。

通常,我的代码使用这种模式。

boost::asio::io_service ioService;
boost::thread_group threadPool;
boost::asio::io_service::work work(ioService);

int noOfCores = boost::thread::hardware_concurrency();
for (int i = 0 ; i < noOfCores ; i ++)
{
    threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
}
threadPool.join_all();

我从其他各种线程执行大量 ioService.post(),所有这些处理程序都具有相同的优先级。

现在,如果我要使用 boost 示例中的 handler_priority_queue,首先我必须为 add() 和 execute_all() 函数添加一些互斥保护。

boost::mutex _mtx;
void add(int priority, boost::function<void()> function)
{
    boost::lock_guard<boost::mutex> lock(_mtx);
    handlers_.push(queued_handler(priority, function));
}

void execute_all()
{
    while (!handlers_.empty())
    {
        boost::unique_lock<boost::mutex> lock(_mtx);
        queued_handler handler = handlers_.top();
        handlers_.pop();
        lock.unlock();
        handler.execute();
    }
}

但是,我不确定是什么替换了我当前代码中的以下行。

    threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));

我显然需要以某种方式将 io_service::run 替换为 handler_priority_queue::execute_all() 。但是怎么做?最好的方法是什么?

我可以这样做...

    threadPool.create_thread(boost::bind(&handler_priority_queue::execute_all,
&pri_queue));

但是 execute_all() 马上就出来了。我认为 execute_all() 需要以某种方式重新设计。这个怎么样?它有效,但我不确定其中的陷阱。

void execute_all()
{
    while (ioService.run_one())
    {
        while (ioService.poll_one());
        while (true)
        {
            queued_handler handler;
            {
                boost::lock_guard<boost::mutex> lock(_mtx);
                if (handlers_.empty())
                {
                    break;
                }
                else
                {
                    handler = handlers_.top();
                    handlers_.pop();
                }
            }
            handler.execute();
        }
    }
}

【问题讨论】:

  • 我建议您仔细检查更改任务的执行顺序是否可以解决您的性能问题。通常情况下,很难解决简单的解决方案。
  • 这不是我在查询中链接的示例吗?我正在尝试实现它的多线程版本。
  • 我没有看到那个链接的例子,因为它是一个不透明的链接 :) 很高兴你看到了它

标签: c++ multithreading boost-asio


【解决方案1】:

Asio 不提供这种可能性。该示例仅限于单个线程,因为它不会修改 Asio 的调度程序。调度程序以先进先出的方式在线程之间分配任务,我不知道有什么方法可以修改它。只要在您启动异步操作时无法指定优先级(例如io_service::post),调度程序就不知道任务优先级,因此无法使用它。

当然,您可以为每个线程使用priority_queue,但在这种情况下,您的优先级将产生有限的“线程本地”效果:只有调度到同一线程的任务才会根据它们的优先级执行。考虑示例(伪代码):

io_service.post(task(priority_1));
io_service.post(task(priority_2));
io_service.post(task(priority_3));

thread_1(io_service.run());
thread_2(io_service.run());

假设任务 1 和 3 由 thread_1 处理,任务 2 由 thread_2 处理。所以thread_1 将执行任务 3,然后执行任务 1,如果在链接示例中使用优先级队列。但thread_2 不知道这些任务,会立即执行任务 2,可能在任务 3 之前执行。

您的选择是实现自己的调度程序(复杂性取决于您的要求,但一般来说可能很棘手)或寻找第 3 方解决方案。例如。我会检查英特尔 TBB Priority

编辑:试图详细说明“自己的调度程序”案例:

对于更简单的版本、线程池和从队列中提取的线程,您需要一个非常好的多生产者/多消费者并发队列。从优先级的角度来看,这将是一个相当公平的解决方案:所有较高优先级的任务将(几乎总是)在较低优先级的任务之前开始执行。如果性能比公平更重要,则可以改进此解决方案。但这值得提出另一个问题以及有关您的特定案例的大量详细信息。

【讨论】:

  • 我在最后加了一个例子。你怎么看?
  • 这是一种“自己的调度程序”的情况,这里你根本不需要 Asio
  • 这个简化版本可能存在严重的争用(性能)问题,具体取决于应用程序
  • 我明白你关于优先与公平的观点。我需要绝对优先考虑一些任务而不是其他任务。我有一个巨大的基于 asio 的代码库,所以我希望进行最小的更改,比如只更改 post() 调用。如果上面的 execute_all() 解决了这个问题,我以后可以做进一步的微调。
  • 很难说你的解决方案是否解决了任何关于你的应用程序的细节问题
猜你喜欢
  • 2012-05-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-17
  • 2011-12-18
  • 1970-01-01
相关资源
最近更新 更多