Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
310 views
in Technique[技术] by (71.8m points)

c++ - Synchronization and result retrieval with ASIO coroutines and networking TS with C++20

I am experimenting with asio stackless coroutines and newly introduced C++20 constructs for co_return, co_await.

I went through the examples and have a few questions based on my understanding and because I still see race condition related crashes :(

Example: (I am going to omit all exception handing for the sake of simplicity)


awaitable<size_t> echo_request(tcp::socket& socket)
{
  using namespace boost::asio;
  using namespace std::literals;
  
  char data[3] = {};

  co_await async_write(socket, buffer("abc"sv), use_awaitable);
  size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);

  co_return n;
}


int main()
{

  // calling the coroutines

  using namespace boost::asio;

  auto ctx = io_context{};

  auto sockets = vector<tcp::socket>{};
  for(auto i{0}; i<20; ++i)
    sockets.push_back(make_connection(...));

  auto strand = make_strand(ctx);

  for(auto&& socket : sockets)
    co_spawn(strand, echo_request(), detached);

  auto th = std::thread([&]{ ctx.run(); });
  ctx.run();
  th.join();
}

I understand that ctx.run() will be executed as many times as there are current tasks. Because in the above example 20 times co_spawn is being called with 2 co_wait calls each, the sum of all ctx.run() results will be 40. However, here the awaitable<size_t> returns a result. How can this result be retrieved from the main function?

I saw one example where an std::promise has been used in a lambda function instead of detached awaitable:


std::promise<size_t> p;

co_spawn(strand, echo_request(), [&p](auto&& err, auto&& value)
{
  if(err) p.set_exception(err);
  else p.set_value(value);
});

std::promise might internally use a mutex (and in fact it does use it). Is there any more lightweight alternative?

Finally, what would happen if any of the echo_request functions would modify non-atomic value (it might be a more complex structure like hash table, but here it's just an int increment to a global variable).

Example:

int total_requests = 0;

awaitable<size_t> echo_request(tcp::socket socket)
{
  using namespace boost::asio;
  using namespace std::literals;
  
  char data[3] = {};

  co_await async_write(socket, buffer("abc"sv), use_awaitable);

  ++total_requests;  // THIS IS A RACE CONDITION

  size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);

  ++total_requests;  // ANOTHER RACE CONDITION

  co_return n;
}

My understanding that if asynchronous operations are executed within a strand they are linearized (without order guarantee). Basically, even running io_context::run() from multiple threads assures that there will be no concurrency between the awaitables executed within one strand? I was expecting that the following modification would fix the issue, but it did not.

Furthermore, only places marked with co_await are interruption points. So if multiple function instances running within the same strand outside the co_await there will be no interruption. Basically, the code fragment marked with // THIS IS A RACE CONDITION cannot end up executing by 2 threads concurrently and it's safe to increment the global variable. Is this correct?

If that is not so, would the below code fragment prevent the race condition?

int total_requests = 0;

awaitable<size_t> echo_request(boost::asio::strand& strand, boost::asio::tcp::socket& socket)
{
  using namespace boost::asio;
  using namespace std::literals;
  
  char data[3] = {};

  co_await async_write(socket, buffer("abc"sv), use_awaitable);

  strand.execute([&] { ++total_reqeusts; });  // WOULD THAT PREVENT THE RACE CONDITION???

  size_t n = co_await async_read(socket, buffer(data, sizeof(data)), transfer_all(), use_awaitable);

  strand.execute([&] { ++total_requests; });

  co_return n;
}

question from:https://stackoverflow.com/questions/65641080/synchronization-and-result-retrieval-with-asio-coroutines-and-networking-ts-with

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

ctx.run will run until all work on the io_context is complete. So if you remove this line

auto th = std::thread([&]{ ctx.run(); });

You'll convert your application into a single threaded async application. Then you don't need to worry about strands etc.

As a single threaded app it's OK to sum requests or bytes to your global variable.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...