Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
436 views
in Technique[技术] by (71.8m points)

c++ - Boost.Asio: Is it a good thing to use a `io_service` per connection/socket?

I want to create an application that implements one-thread-per-connection model. But each connection must be stoppable. I have tried this boost.asio example which implements the blocking version of what I want. But after a little bit questioning I've found out that there is no reliable way to stop the session of that example. So I've tried to implement my own. I had to use asynchronous functions. Since I want to make a thread to manage only one connection and there is no way to control which asynchronous job is employed to which thread, I decided to use io_service for each connection/socket/thread.

So is it a good approach, do you know a better approach?

My code is here so you can examine and review it:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
    ba::io_service service;
    socket_type sock;
    b::thread *thread;
    ba::streambuf stream_buffer;    // for reading etc
    Server *server;
    void AsyncReadString() {
        ba::async_read_until(
            sock,
            stream_buffer,
            '',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::string newstr = s + '';  // add a null char
        ba::async_write(
            sock,
            ba::buffer(newstr.c_str(), newstr.size()),
            b::bind(&Connection::WriteHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    virtual void Session() {
        AsyncReadString();
        service.run();  // run at last
    }
    std::string ExtractString() {
        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '');
        return s;
    }
    virtual void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
        if (!ec) {
            std::cout << (ExtractString() + "
");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    virtual void WriteHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
    }
public:
    Connection(Server *s) :
        service(),
        sock(service),
        server(s),
        thread(NULL)
    {  }
    socket_type& Socket() {
        return sock;
    }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Connection::Session, this));
    }
    void Join() {
        if (thread) thread->join();
    }
    void Stop() {
        service.stop();
    }
    void KillMe();
    virtual ~Connection() {
    }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<Connection*> Connections;
protected:
    ba::io_service service;
    acceptor_type acc;
    b::thread *thread;
    virtual void AcceptHandler(const bs::error_code &ec) {
        if (!ec) {
            Connections.back()->Start();
            Connections.push_back(new Connection(this));
            acc.async_accept(
                Connections.back()->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error));
        }
        else {
            // do nothing
            // since the new session will be deleted
            // automatically by the destructor
        }
    }
    virtual void ThreadFunc() {
        Connections.push_back(new Connection(this));
        acc.async_accept(
            Connections.back()->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error));
        service.run();
    }
public:
    Server():
        service(),
        acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(NULL)
    {  }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Server::ThreadFunc, this));
    }
    void Stop() {
        service.stop();
    }
    void Join() {
        if (thread) thread->join();
    }
    void StopAllConnections() {
        for (auto c : Connections) {
            c->Stop();
        }
    }
    void JoinAllConnections() {
        for (auto c : Connections) {
            c->Join();
        }
    }
    void KillAllConnections() {
        for (auto c : Connections) {
            delete c;
        }
        Connections.clear();
    }
    void KillConnection(Connection *c) {
        Connections.remove(c);
        delete c;
    }
    virtual ~Server() {
        delete thread;
        // connection should be deleted by the user (?)
    }
};

void Connection::KillMe() {
    server->KillConnection(this);
}

int main() {
    try {
        Server s;
        s.Start();
        std::cin.get(); // wait for enter
        s.Stop();   // stop listening first
        s.StopAllConnections(); // interrupt ongoing connections
        s.Join();   // wait for server, should return immediately
        s.JoinAllConnections(); // wait for ongoing connections
        s.KillAllConnections(); // destroy connection objects
        // at the end of scope, Server will be destroyed
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

No. Using an io_service object per connection is definitely a smell. Especially since you're also running each connection on a dedicated thread.

At this point you have to ask yourself what did asynchrony buy you? You can have all the code synchronous and have exactly the same number of threads etc.

Clearly you want to multiplex the connections onto a far smaller number of services. In practice there are a few sensible models like

  1. a single io_service with a single service thread (this is usually good). No tasks queued on the service may ever block for significant time or the latency will suffer

  2. a single io_service with a number of threads executing handlers. The number of threads in the pool should be enough to service the max. number of simultaneous CPU intensive tasks supported (or again, the latency will start to go up)

  3. an io_service per thread, usually one thread per logical core and with thread affinity so that it "sticks" to that core. This can be ideal for cache locality

UPDATE: Demo

Here's a demo that shows the idiomatic style using option 1. from above:

Live On Coliru

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
#include <iostream>
#include <istream>
#include <list>
#include <string>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket   socket_type;

const short PORT = 11235;

// A connection has its own io_service and socket
class Connection : public b::enable_shared_from_this<Connection>
{
public:
    typedef boost::shared_ptr<Connection> Ptr;
protected:
    socket_type    sock;
    ba::streambuf  stream_buffer; // for reading etc
    std::string    message;

    void AsyncReadString() {
        std::cout << __PRETTY_FUNCTION__ << "
";

        ba::async_read_until(
            sock,
            stream_buffer,
            '',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::cout << __PRETTY_FUNCTION__ << "
";

        message = s;

        ba::async_write(
            sock,
            ba::buffer(message.c_str(), message.size()+1),
            b::bind(&Connection::WriteHandler, shared_from_this(),
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    std::string ExtractString() {
        std::cout << __PRETTY_FUNCTION__ << "
";

        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '');
        return s;
    }
    void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) 
    {
        std::cout << __PRETTY_FUNCTION__ << "
";

        if (!ec) {
            std::cout << (ExtractString() + "
");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
        std::cout << __PRETTY_FUNCTION__ << "
";
    }
public:
    Connection(ba::io_service& svc) : sock(svc) { }

    virtual ~Connection() {
        std::cout << __PRETTY_FUNCTION__ << "
";
    }

    socket_type& Socket() { return sock;          } 
    void Session()        { AsyncReadString();    } 
    void Stop()           { sock.cancel();        }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<boost::weak_ptr<Connection> > m_connections;
protected:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _work;
    acceptor_type _acc;
    b::thread thread;

    void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
        if (!ec) {
            accepted->Session();
            DoAccept();
        }
        else {
            // do nothing the new session will be deleted automatically by the
            // destructor
        }
    }

    void DoAccept() {
        auto newaccept = boost::make_shared<Connection>(_service);

        _acc.async_accept(
            newaccept->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error,
                newaccept
            ));
    }

public:
    Server():
        _service(),
        _work(ba::io_service::work(_service)),
        _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(b::bind(&ba::io_service::run, &_service))
    {  }

    ~Server() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        Stop();
        _work.reset();
        if (thread.joinable()) thread.join();
    }

    void Start() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        DoAccept();
    }

    void Stop() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        _acc.cancel();
    }

    void StopAllConnections() {
        std::cout << __PRETTY_FUNCTION__ << "
";
        for (auto c : m_connections) {
            if (auto p = c.lock())
                p->Stop();
        }
    }
};

int main() {
    try {
        Server s;
        s.Start();

        std::cerr << "Shutdown in 2 seconds...
";
        b::this_thread::sleep_for(b::chrono::seconds(2));

        std::cerr << "Stop accepting...
";
        s.Stop();

        std::cerr << "Shutdown...
";
        s.StopAllConnections(); // interrupt ongoing connections
    } // destructor of Server will join the service thread
    catch (std::exception &e) {
        std::cerr << __FUNCTION__ << ":" << __LINE__ << "
";
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }

    std::cerr << "Byebye
";
}

I modified the main() to run for 2 seconds without user intervention. This is so I can demo it Live On Coliru (of course, it's limited w.r.t the number of client processes).

If you run it with a lot (a lot) of clients, using e.g.

$ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\0" | netcat localhost 11235)& done; wait)

You will find that the two second window handles them all:

$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 28214
      2 hello world 4554
      2 hello world 6216
      2 hello world 7864
      2 hello world 9966
      2 void Server::Stop()
   1000 std::string Connection::ExtractString()
   1001 virtual Connection::~Connection()
   2000 void Connection::AsyncReadString()
   2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

If you really go berserk and raise 1000 to e.g. 100000 there, you'll get things similar to:

sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
Shutdown in 2 seconds...
Shutdown...
Byebye
      2 hello world 5483
      2 hello world 579
      2 hello world 5865
      2 hello world 938
      2 void Server::Stop()
      3 hello world 9613
   1741 std::string Connection::ExtractString()
   1742 virtual Connection::~Connection()
   3482 void Connection::AsyncReadString()
   3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)

On repeated 2-second runs of the server.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...