No. Using an io_service
object per connection is definitely a smell. Especially since you're also running each connection on a dedicated thread.
At this point you have to ask yourself what did asynchrony buy you? You can have all the code synchronous and have exactly the same number of threads etc.
Clearly you want to multiplex the connections onto a far smaller number of services. In practice there are a few sensible models like
a single io_service
with a single service thread (this is usually good). No tasks queued on the service may ever block for significant time or the latency will suffer
a single io_service
with a number of threads executing handlers. The number of threads in the pool should be enough to service the max. number of simultaneous CPU intensive tasks supported (or again, the latency will start to go up)
an io_service per thread, usually one thread per logical core and with thread affinity so that it "sticks" to that core. This can be ideal for cache locality
UPDATE: Demo
Here's a demo that shows the idiomatic style using option 1. from above:
Live On Coliru
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>
namespace ba = boost::asio;
namespace bs = boost::system;
namespace b = boost;
typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;
const short PORT = 11235;
// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
typedef boost::shared_ptr<Connection> Ptr;
protected:
socket_type sock;
ba::streambuf stream_buffer; // for reading etc
std::string message;
void AsyncReadString() {
std::cout << __PRETTY_FUNCTION__ << "
";
ba::async_read_until(
sock,
stream_buffer,
'', // null-char is a delimiter
b::bind(&Connection::ReadHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
void AsyncWriteString(const std::string &s) {
std::cout << __PRETTY_FUNCTION__ << "
";
message = s;
ba::async_write(
sock,
ba::buffer(message.c_str(), message.size()+1),
b::bind(&Connection::WriteHandler, shared_from_this(),
ba::placeholders::error,
ba::placeholders::bytes_transferred));
}
std::string ExtractString() {
std::cout << __PRETTY_FUNCTION__ << "
";
std::istream is(&stream_buffer);
std::string s;
std::getline(is, s, '');
return s;
}
void ReadHandler(
const bs::error_code &ec,
std::size_t bytes_transferred)
{
std::cout << __PRETTY_FUNCTION__ << "
";
if (!ec) {
std::cout << (ExtractString() + "
");
std::cout.flush();
AsyncReadString(); // read again
}
else {
// do nothing, "this" will be deleted later
}
}
void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
std::cout << __PRETTY_FUNCTION__ << "
";
}
public:
Connection(ba::io_service& svc) : sock(svc) { }
virtual ~Connection() {
std::cout << __PRETTY_FUNCTION__ << "
";
}
socket_type& Socket() { return sock; }
void Session() { AsyncReadString(); }
void Stop() { sock.cancel(); }
};
// a server also has its own io_service but it's only used for accepting
class Server {
public:
std::list<boost::weak_ptr<Connection> > m_connections;
protected:
ba::io_service _service;
boost::optional<ba::io_service::work> _work;
acceptor_type _acc;
b::thread thread;
void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
if (!ec) {
accepted->Session();
DoAccept();
}
else {
// do nothing the new session will be deleted automatically by the
// destructor
}
}
void DoAccept() {
auto newaccept = boost::make_shared<Connection>(_service);
_acc.async_accept(
newaccept->Socket(),
b::bind(&Server::AcceptHandler,
this,
ba::placeholders::error,
newaccept
));
}
public:
Server():
_service(),
_work(ba::io_service::work(_service)),
_acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
thread(b::bind(&ba::io_service::run, &_service))
{ }
~Server() {
std::cout << __PRETTY_FUNCTION__ << "
";
Stop();
_work.reset();
if (thread.joinable()) thread.join();
}
void Start() {
std::cout << __PRETTY_FUNCTION__ << "
";
DoAccept();
}
void Stop() {
std::cout << __PRETTY_FUNCTION__ << "
";
_acc.cancel();
}
void StopAllConnections() {
std::cout << __PRETTY_FUNCTION__ << "
";
for (auto c : m_connections) {
if (auto p = c.lock())
p->Stop();
}
}
};
int main() {
try {
Server s;
s.Start();
std::cerr << "Shutdown in 2 seconds...
";
b::this_thread::sleep_for(b::chrono::seconds(2));
std::cerr << "Stop accepting...
";
s.Stop();
std::cerr << "Shutdown...
";
s.StopAllConnections(); // interrupt ongoing connections
} // destructor of Server will join the service thread
catch (std::exception &e) {
std::cerr << __FUNCTION__ << ":" << __LINE__ << "
";
std::cerr << "Exception: " << e.what() << std::endl;
return 1;
}
std::cerr << "Byebye
";
}
I modified the main()
to run for 2 seconds without user intervention. This is so I can demo it Live On Coliru (of course, it's limited w.r.t the number of client processes).
If you run it with a lot (a lot) of clients, using e.g.
$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 11235)& done; wait)
You will find that the two second window handles them all:
$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 28214
2 hello world 4554
2 hello world 6216
2 hello world 7864
2 hello world 9966
2 void Server::Stop()
1000 std::string Connection::ExtractString()
1001 virtual Connection::~Connection()
2000 void Connection::AsyncReadString()
2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
If you really go berserk and raise 1000
to e.g. 100000
there, you'll get things similar to:
sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
2 hello world 5483
2 hello world 579
2 hello world 5865
2 hello world 938
2 void Server::Stop()
3 hello world 9613
1741 std::string Connection::ExtractString()
1742 virtual Connection::~Connection()
3482 void Connection::AsyncReadString()
3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
On repeated 2-second runs of the server.