I am experimenting with asio stackless coroutines and newly introduced C++20 constructs for co_return
, co_await
.
I went through the examples and have a few questions based on my understanding and because I still see race condition related crashes :(
Example: (I am going to omit all exception handing for the sake of simplicity)
awaitable<size_t> echo_request(tcp::socket& socket)
{
using namespace boost::asio;
using namespace std::literals;
char data[3] = {};
co_await async_write(socket, buffer("abc"sv), use_awaitable);
size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);
co_return n;
}
int main()
{
// calling the coroutines
using namespace boost::asio;
auto ctx = io_context{};
auto sockets = vector<tcp::socket>{};
for(auto i{0}; i<20; ++i)
sockets.push_back(make_connection(...));
auto strand = make_strand(ctx);
for(auto&& socket : sockets)
co_spawn(strand, echo_request(), detached);
auto th = std::thread([&]{ ctx.run(); });
ctx.run();
th.join();
}
I understand that ctx.run()
will be executed as many times as there are current tasks. Because in the above example 20 times co_spawn
is being called with 2 co_wait
calls each, the sum of all ctx.run()
results will be 40. However, here the awaitable<size_t>
returns a result. How can this result be retrieved from the main
function?
I saw one example where an std::promise
has been used in a lambda function instead of detached
awaitable:
std::promise<size_t> p;
co_spawn(strand, echo_request(), [&p](auto&& err, auto&& value)
{
if(err) p.set_exception(err);
else p.set_value(value);
});
std::promise
might internally use a mutex (and in fact it does use it). Is there any more lightweight alternative?
Finally, what would happen if any of the echo_request
functions would modify non-atomic value (it might be a more complex structure like hash table, but here it's just an int increment to a global variable).
Example:
int total_requests = 0;
awaitable<size_t> echo_request(tcp::socket socket)
{
using namespace boost::asio;
using namespace std::literals;
char data[3] = {};
co_await async_write(socket, buffer("abc"sv), use_awaitable);
++total_requests; // THIS IS A RACE CONDITION
size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);
++total_requests; // ANOTHER RACE CONDITION
co_return n;
}
My understanding that if asynchronous operations are executed within a strand
they are linearized (without order guarantee). Basically, even running io_context::run()
from multiple threads assures that there will be no concurrency between the awaitables executed within one strand? I was expecting that the following modification would fix the issue, but it did not.
Furthermore, only places marked with co_await
are interruption points. So if multiple function instances running within the same strand outside the co_await
there will be no interruption. Basically, the code fragment marked with // THIS IS A RACE CONDITION
cannot end up executing by 2 threads concurrently and it's safe to increment the global variable. Is this correct?
If that is not so, would the below code fragment prevent the race condition?
int total_requests = 0;
awaitable<size_t> echo_request(boost::asio::strand& strand, boost::asio::tcp::socket& socket)
{
using namespace boost::asio;
using namespace std::literals;
char data[3] = {};
co_await async_write(socket, buffer("abc"sv), use_awaitable);
strand.execute([&] { ++total_reqeusts; }); // WOULD THAT PREVENT THE RACE CONDITION???
size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);
strand.execute([&] { ++total_requests; });
co_return n;
}
question from:
https://stackoverflow.com/questions/65641080/synchronization-and-result-retrieval-with-asio-coroutines-and-networking-ts-with