【发布时间】:2021-04-14 20:58:36
【问题描述】:
我正在为co_return、co_await 试验 asio 无堆栈协程和新引入的 C++20 结构。
我浏览了这些示例并根据我的理解提出了一些问题,因为我仍然看到与竞争条件相关的崩溃:(
示例:(为简单起见,我将省略所有异常处理)
awaitable<size_t> echo_request(tcp::socket& socket)
{
using namespace boost::asio;
using namespace std::literals;
char data[3] = {};
co_await async_write(socket, buffer("abc"sv), use_awaitable);
size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);
co_return n;
}
int main()
{
// calling the coroutines
using namespace boost::asio;
auto ctx = io_context{};
auto sockets = vector<tcp::socket>{};
for(auto i{0}; i<20; ++i)
sockets.push_back(make_connection(...));
auto strand = make_strand(ctx);
for(auto&& socket : sockets)
co_spawn(strand, echo_request(), detached);
auto th = std::thread([&]{ ctx.run(); });
ctx.run();
th.join();
}
我了解ctx.run() 将执行与当前任务一样多的次数。因为在上面的示例中,co_spawn 被调用了 20 次,每次调用 2 次 co_wait,所有ctx.run() 结果的总和将为 40。但是,这里 awaitable<size_t> 返回一个结果。如何从main 函数中检索到这个结果?
我看到了一个示例,其中在 lambda 函数中使用了 std::promise 而不是 detached 可等待:
std::promise<size_t> p;
co_spawn(strand, echo_request(), [&p](auto&& err, auto&& value)
{
if(err) p.set_exception(err);
else p.set_value(value);
});
std::promise 可能在内部使用互斥锁(实际上它确实使用了它)。有没有更轻量级的替代品?
最后,如果任何echo_request 函数会修改非原子值会发生什么(它可能是一个更复杂的结构,如哈希表,但这里只是一个全局变量的 int 增量)。
例子:
int total_requests = 0;
awaitable<size_t> echo_request(tcp::socket socket)
{
using namespace boost::asio;
using namespace std::literals;
char data[3] = {};
co_await async_write(socket, buffer("abc"sv), use_awaitable);
++total_requests; // THIS IS A RACE CONDITION
size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);
++total_requests; // ANOTHER RACE CONDITION
co_return n;
}
我的理解是,如果异步操作在 strand 内执行,它们会被线性化(没有顺序保证)。基本上,即使从多个线程运行io_context::run() 也可以确保在一个线程中执行的等待对象之间不会存在并发?我原以为下面的修改会解决这个问题,但它没有。
此外,只有标有co_await 的地方才是中断点。因此,如果多个函数实例在co_await 之外的同一条链中运行,则不会出现中断。基本上,标有// THIS IS A RACE CONDITION 的代码片段最终不能由2 个线程同时执行,并且递增全局变量是安全的。这是正确的吗?
如果不是这样,下面的代码片段会阻止竞争条件吗?
int total_requests = 0;
awaitable<size_t> echo_request(boost::asio::strand& strand, boost::asio::tcp::socket& socket)
{
using namespace boost::asio;
using namespace std::literals;
char data[3] = {};
co_await async_write(socket, buffer("abc"sv), use_awaitable);
strand.execute([&] { ++total_reqeusts; }); // WOULD THAT PREVENT THE RACE CONDITION???
size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);
strand.execute([&] { ++total_requests; });
co_return n;
}
【问题讨论】: