【问题标题】:Asio's `io_context` and concurrency hintsAsio 的 `io_context` 和并发提示
【发布时间】:2021-12-23 09:22:34
【问题描述】:

asio::io_context 构造函数采用可选的concurrency hint,当只有单个线程将与io_context 或关联的 IO 对象交互(或线程之间的同步已在调用代码中完成)时,它会跳过一些内部锁。

我的理解是1 将允许我在一个线程中调用io_context::run() 并与io_context 正常交互(即,除reset()run()run_one() 等之外的所有方法)和所有关联的 IO 对象。

另外,对于ASIO_CONCURRENCY_HINT_UNSAFE_IO,调用IO对象(下例中的数字3)上的任何IO方法都是非法的,而使用ASIO_CONCURRENCY_HINT_UNSAFE调用io_context本身的任何方法都是非法的。这是正确的吗?

#include <asio/io_context.hpp>
#include <asio/ip/tcp.hpp>
#include <chrono>
#include <iostream>
#include <thread>

static const char msg[] = "Hello World\n";

int main() {
    const auto concurrency_hint = ASIO_CONCURRENCY_HINT_1;
    asio::io_context ctx{concurrency_hint};
    asio::ip::tcp::acceptor acc(ctx, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), 7999));
    acc.listen(2);

    asio::ip::tcp::socket peer(ctx);
    acc.async_accept(peer, [&peer](const asio::error_code &error) {
        // call async methods from the thread running the io context (1)
        peer.async_write_some(
            asio::const_buffer(msg, 12), [&](const asio::error_code &error, std::size_t len) {
                peer.close();
            });
    });

    std::thread io_thread([&ctx]() { ctx.run(); });

    // call `post()` from another thread (2)
    asio::post([]() { std::cout << msg << std::flush; });

    // call `async_accept` for an IO object running on another thread (3)
    acc.async_accept([&](const asio::error_code &error, asio::ip::tcp::socket peer) {
        peer.close();
    });

    // call `run()` while another thread is already doing so (4)
    ctx.run_for(std::chrono::seconds(2));

    std::this_thread::sleep_for(std::chrono::seconds(5));
    // Call `io_context::stop()` from another thread (5)
    ctx.stop();

    io_thread.join();
    return 0;
}
1 UNSAFE UNSAFE_IO SAFE
IO methods, same thread (1)
post() from another thread (2)
IO methods, another thread (3) ✔?
run() from two threads (4)
io_context::stop() from another thread (5) ?

【问题讨论】:

  • 学究式地UNSAFE 不允许来自其他线程的操作,它不允许来自其他线程的非同步操作。
  • @Frank 绝对正确,但如果我想这样做,我不妨提供另一个并发提示,以便 Asio 负责必要的同步。
  • 不一定。例如,您可能有一个多线程设置/拆卸,但来自单个线程的大量post()。理想情况下,您希望在 post() 组上进行同步,而不是让 asio 对它们中的每一个执行同步。
  • 或者你有另一个共享资源已经同步的情况,使得内置的io_context同步变得多余。因此,您正在同步,但不希望 Asio 为您完成。
  • 所有优点,我已经添加了关于手动同步的说明。

标签: c++ multithreading boost-asio


【解决方案1】:

添加了禁用所有同步的功能,以使该库与所有其他“std”库保持一致,其中约定总是让调用者处理同步。这是针对“std::network”提案的。 (如果我没记错的话)。

比较有趣的情况是“1”和BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO。

https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/concurrency_hint.html 我可以看到值“1”对某些工作队列使用线程本地存储。这意味着它有效地破坏了工作平衡。我认为这意味着当与更多线程一起使用时它将正确运行,但效率较低。 抱歉,我知道这是虚无缥缈的,但手册中并不清楚。代码明确将其称为“提示”,这通常意味着它会正确运行,但如果设置错误,则会更慢。

另一个值BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO,除了定义ASIO_CONCURRENCY_HINT_LOCKING_REACTOR_IO外,与全锁相同。同样,从手册中不清楚这实际上意味着什么,我找不到任何证据表明它曾经被使用过。 (在我再次检查更高版本的 ASIO 后,我会稍微更新这个答案)。 但是,从描述来看,它看起来像是一个承诺(不是暗示),没有两个完成处理程序/任何异步操作同时发生。反应器引擎是说“当它完成时执行此操作”的位,因此它会影响新异步回调的注册以及回调被调用后的删除。

抱歉,我知道这不是一个好的答案,但我做了一些研究,并认为我不妨发布它。


编辑:更多信息

我浏览了 boost/asio/1.77.0

对我来说,第一个学习点是并非所有实现都使用无锁(并发)队列。 windows版本使用临界区和windows事件... TIL

但是,我确实在代码中找到了相关部分。例如,epoll 版本(见下文)。注意:epoll 不是 linux io_uring 的默认设置。

epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
{
  mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
        REACTOR_IO, scheduler_.concurrency_hint()));
}

这样做是用假互斥体或真实互斥体分配“状态”。这是通过 conditional_mutex 类完成的。

另一个有趣的事情是标志与提示交互。在线程本地队列优化上切换锁定开关,如下...


scheduler::scheduler(boost::asio::execution_context& ctx,
    int concurrency_hint, bool own_thread, get_task_func_type get_task)
  : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
    one_thread_(concurrency_hint == 1
        || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
          SCHEDULER, concurrency_hint)
        || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
          REACTOR_IO, concurrency_hint)),
    mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
          SCHEDULER, concurrency_hint)),
    task_(0),
    get_task_(get_task),
    task_interrupted_(true),
    outstanding_work_(0),
    stopped_(false),
    shutdown_(false),
    concurrency_hint_(concurrency_hint),
    thread_(0)
...

不过,我发现没有什么与我原来的答案相反,禁用 IO 锁定似乎只会从反应器中移除锁,当调用异步函数或运行完成处理程序时会使用该锁。


反馈给cmets

@tstenner 质疑 io_uring 是否是 linux 的默认设置。他的问题是对的,原来我没有检查,这取决于linux版本。 #if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,45)

他也问

打电话,例如socket.async_write(…) 而另一个线程正在 io_context::run() 中运行完成处理程序有问题吗?

答案是(通过阅读代码)是的。相同的互斥锁用于保护“start_op”,它在很多地方都使用过,但特别是在 basic_socket/acceptor 异步函数中。 所以假设“start_op”需要互斥锁,它被禁用的事实会让你得出结论,这里有潜在的 U/B。

impl 目录(用于 start_op)中的快速“grep”将为您提供我将避免使用的函数列表。您可以将计时器异步函数添加到该列表中,这似乎是在反应器本身中实现的。

【讨论】:

  • 为了完整起见,您能谈谈与BOOST_ASIO_DISABLE_THREADS的关系吗?
  • "注意:epoll 不是 linux io_uring 的默认设置。" - 你确定吗?在最新的发行说明中指出,必须显式启用 io_uring 后端
  • “从反应器中移除锁,当调用异步函数或运行完成句柄 r 时使用。”:我是否正确地读取了调用例如socket.async_write(…) 当另一个线程从 io_context::run() 内部运行完成处理程序时有问题吗?
  • 你对 epoll 的看法是对的。我已经用有关异步函数的更多信息更新了答案。就我个人而言,我会假设所有“异步”函数都会被争用,但正如答案所说,您可以在代码中查找特定问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-05-02
  • 1970-01-01
  • 2021-11-22
  • 2018-09-22
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多