【问题标题】:Synchronization and result retrieval with ASIO coroutines and networking TS with C++20使用 ASIO 协程和网络 TS 使用 C++20 进行同步和结果检索
【发布时间】:2021-04-14 20:58:36
【问题描述】:

我正在为co_returnco_await 试验 asio 无堆栈协程和新引入的 C++20 结构。

我浏览了这些示例并根据我的理解提出了一些问题,因为我仍然看到与竞争条件相关的崩溃:(

示例:(为简单起见,我将省略所有异常处理)


awaitable<size_t> echo_request(tcp::socket& socket)
{
  using namespace boost::asio;
  using namespace std::literals;
  
  char data[3] = {};

  co_await async_write(socket, buffer("abc"sv), use_awaitable);
  size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);

  co_return n;
}


int main()
{

  // calling the coroutines

  using namespace boost::asio;

  auto ctx = io_context{};

  auto sockets = vector<tcp::socket>{};
  for(auto i{0}; i<20; ++i)
    sockets.push_back(make_connection(...));

  auto strand = make_strand(ctx);

  for(auto&& socket : sockets)
    co_spawn(strand, echo_request(), detached);

  auto th = std::thread([&]{ ctx.run(); });
  ctx.run();
  th.join();
}

我了解ctx.run() 将执行与当前任务一样多的次数。因为在上面的示例中,co_spawn 被调用了 20 次,每次调用 2 次 co_wait,所有ctx.run() 结果的总和将为 40。但是,这里 awaitable&lt;size_t&gt; 返回一个结果。如何从main 函数中检索到这个结果?

我看到了一个示例,其中在 lambda 函数中使用了 std::promise 而不是 detached 可等待:


std::promise<size_t> p;

co_spawn(strand, echo_request(), [&p](auto&& err, auto&& value)
{
  if(err) p.set_exception(err);
  else p.set_value(value);
});

std::promise 可能在内部使用互斥锁(实际上它确实使用了它)。有没有更轻量级的替代品?

最后,如果任何echo_request 函数会修改非原子值会发生什么(它可能是一个更复杂的结构,如哈希表,但这里只是一个全局变量的 int 增量)。

例子:

int total_requests = 0;

awaitable<size_t> echo_request(tcp::socket socket)
{
  using namespace boost::asio;
  using namespace std::literals;
  
  char data[3] = {};

  co_await async_write(socket, buffer("abc"sv), use_awaitable);

  ++total_requests;  // THIS IS A RACE CONDITION

  size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);

  ++total_requests;  // ANOTHER RACE CONDITION

  co_return n;
}

我的理解是,如果异步操作在 strand 内执行,它们会被线性化(没有顺序保证)。基本上,即使从多个线程运行io_context::run() 也可以确保在一个线程中执行的等待对象之间不会存在并发?我原以为下面的修改会解决这个问题,但它没有。

此外,只有标有co_await 的地方才是中断点。因此,如果多个函数实例在co_await 之外的同一条链中运行,则不会出现中断。基本上,标有// THIS IS A RACE CONDITION 的代码片段最终不能由2 个线程同时执行,并且递增全局变量是安全的。这是正确的吗?

如果不是这样,下面的代码片段会阻止竞争条件吗?

int total_requests = 0;

awaitable<size_t> echo_request(boost::asio::strand& strand, boost::asio::tcp::socket& socket)
{
  using namespace boost::asio;
  using namespace std::literals;
  
  char data[3] = {};

  co_await async_write(socket, buffer("abc"sv), use_awaitable);

  strand.execute([&] { ++total_reqeusts; });  // WOULD THAT PREVENT THE RACE CONDITION???

  size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);

  strand.execute([&] { ++total_requests; });

  co_return n;
}

【问题讨论】:

    标签: c++ coroutine c++20 asio


    【解决方案1】:

    ctx.run 将一直运行,直到 io_context 上的所有工作完成。所以如果你删除这一行

    auto th = std::thread([&]{ ctx.run(); });
    

    您会将应用程序转换为单线程异步应用程序。那你就不用担心股线等了。

    作为单线程应用程序,可以将请求或字节汇总到全局变量中。

    【讨论】:

    • 感谢您的回复。但是,我想使用多线程方法,并想了解为什么 strands 没有在应该的地方线性化并发。
    • 一般来说,我之前考虑过这样的提议,但是我要重构的应用程序现在处理至少 2 个访问共享状态的线程。因此,我真的很想了解,为什么 strand 没有线性化并行性。
    猜你喜欢
    • 2021-12-27
    • 2020-06-26
    • 2010-12-29
    • 2013-06-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多