【问题标题】:Boost.Asio: Is it a good thing to use a `io_service` per connection/socket?Boost.Asio:每个连接/套接字使用“io_service”是一件好事吗?
【发布时间】:2015-10-07 18:37:39
【问题描述】:

我想创建一个实现每个连接一个线程模型的应用程序。但是每个连接都必须是可停止的。我试过this boost.asio example,它实现了我想要的阻塞版本。但经过一番询问后,我发现没有可靠的方法来停止该示例的会话。所以我试图实现我自己的。我不得不使用异步函数。由于我只想创建一个线程来管理一个连接并且无法控制哪个异步作业用于哪个线程,因此我决定为每个连接/套接字/线程使用io_service

那么这是一个好方法吗,你知道更好的方法吗?

我的代码在这里,您可以检查和查看它:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <boost/thread.hpp>
#include <boost/scoped_ptr.hpp>
#include <list>
#include <iostream>
#include <string>
#include <istream>

namespace ba = boost::asio;
namespace bs = boost::system;
namespace b  = boost;

typedef ba::ip::tcp::acceptor acceptor_type;
typedef ba::ip::tcp::socket socket_type;

const short PORT = 11235;
class Server;

// A connection has its own io_service and socket
class Connection {
protected:
    ba::io_service service;
    socket_type sock;
    b::thread *thread;
    ba::streambuf stream_buffer;    // for reading etc
    Server *server;
    void AsyncReadString() {
        ba::async_read_until(
            sock,
            stream_buffer,
            '\0',   // null-char is a delimiter
            b::bind(&Connection::ReadHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    void AsyncWriteString(const std::string &s) {
        std::string newstr = s + '\0';  // add a null char
        ba::async_write(
            sock,
            ba::buffer(newstr.c_str(), newstr.size()),
            b::bind(&Connection::WriteHandler, this,
                ba::placeholders::error,
                ba::placeholders::bytes_transferred));
    }
    virtual void Session() {
        AsyncReadString();
        service.run();  // run at last
    }
    std::string ExtractString() {
        std::istream is(&stream_buffer);
        std::string s;
        std::getline(is, s, '\0');
        return s;
    }
    virtual void ReadHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
        if (!ec) {
            std::cout << (ExtractString() + "\n");
            std::cout.flush();
            AsyncReadString();  // read again
        }
        else {
            // do nothing, "this" will be deleted later
        }
    }
    virtual void WriteHandler(
        const bs::error_code &ec,
        std::size_t bytes_transferred) {
    }
public:
    Connection(Server *s) :
        service(),
        sock(service),
        server(s),
        thread(NULL)
    {  }
    socket_type& Socket() {
        return sock;
    }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Connection::Session, this));
    }
    void Join() {
        if (thread) thread->join();
    }
    void Stop() {
        service.stop();
    }
    void KillMe();
    virtual ~Connection() {
    }
};

// a server also has its own io_service but it's only used for accepting
class Server {
public:
    std::list<Connection*> Connections;
protected:
    ba::io_service service;
    acceptor_type acc;
    b::thread *thread;
    virtual void AcceptHandler(const bs::error_code &ec) {
        if (!ec) {
            Connections.back()->Start();
            Connections.push_back(new Connection(this));
            acc.async_accept(
                Connections.back()->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error));
        }
        else {
            // do nothing
            // since the new session will be deleted
            // automatically by the destructor
        }
    }
    virtual void ThreadFunc() {
        Connections.push_back(new Connection(this));
        acc.async_accept(
            Connections.back()->Socket(),
            b::bind(&Server::AcceptHandler,
                this,
                ba::placeholders::error));
        service.run();
    }
public:
    Server():
        service(),
        acc(service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
        thread(NULL)
    {  }
    void Start() {
        if (thread) delete thread;
        thread = new b::thread(
            b::bind(&Server::ThreadFunc, this));
    }
    void Stop() {
        service.stop();
    }
    void Join() {
        if (thread) thread->join();
    }
    void StopAllConnections() {
        for (auto c : Connections) {
            c->Stop();
        }
    }
    void JoinAllConnections() {
        for (auto c : Connections) {
            c->Join();
        }
    }
    void KillAllConnections() {
        for (auto c : Connections) {
            delete c;
        }
        Connections.clear();
    }
    void KillConnection(Connection *c) {
        Connections.remove(c);
        delete c;
    }
    virtual ~Server() {
        delete thread;
        // connection should be deleted by the user (?)
    }
};

void Connection::KillMe() {
    server->KillConnection(this);
}

int main() {
    try {
        Server s;
        s.Start();
        std::cin.get(); // wait for enter
        s.Stop();   // stop listening first
        s.StopAllConnections(); // interrupt ongoing connections
        s.Join();   // wait for server, should return immediately
        s.JoinAllConnections(); // wait for ongoing connections
        s.KillAllConnections(); // destroy connection objects
        // at the end of scope, Server will be destroyed
    }
    catch (std::exception &e) {
        std::cerr << "Exception: " << e.what() << std::endl;
        return 1;
    }
    return 0;
}

【问题讨论】:

    标签: c++ multithreading sockets boost boost-asio


    【解决方案1】:

    没有。每个连接使用io_service 对象绝对是一种气味。特别是因为您还在专用线程上运行每个连接。

    此时您必须问自己异步为您带来了什么?您可以让所有代码同步并拥有完全相同数量的线程等。

    显然,您希望将连接多路复用到数量少得多的服务上。在实践中,有一些明智的模型,例如

    1. 带有单个服务线程的单个io_service(这通常很好)。服务上排队的任务可能永远不会阻塞很长时间,否则延迟会受到影响

    2. 单个io_service 有多个线程执行处理程序。池中的线程数应该足以为最大值提供服务。支持的并发 CPU 密集型任务的数量(或者同样,延迟将开始上升)

    3. 每个线程一个 io_service,通常每个逻辑核心一个线程,并且具有线程关联性,以便它“粘”到那个核心。这可能是缓存局部性的理想选择

    更新:演示

    这是一个演示,使用上面的选项 1. 显示惯用风格:

    Live On Coliru

    #include <boost/array.hpp>
    #include <boost/asio.hpp>
    #include <boost/bind.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/make_shared.hpp>
    #include <boost/thread.hpp>
    #include <iostream>
    #include <istream>
    #include <list>
    #include <string>
    
    namespace ba = boost::asio;
    namespace bs = boost::system;
    namespace b  = boost;
    
    typedef ba::ip::tcp::acceptor acceptor_type;
    typedef ba::ip::tcp::socket   socket_type;
    
    const short PORT = 11235;
    
    // A connection has its own io_service and socket
    class Connection : public b::enable_shared_from_this<Connection>
    {
    public:
        typedef boost::shared_ptr<Connection> Ptr;
    protected:
        socket_type    sock;
        ba::streambuf  stream_buffer; // for reading etc
        std::string    message;
    
        void AsyncReadString() {
            std::cout << __PRETTY_FUNCTION__ << "\n";
    
            ba::async_read_until(
                sock,
                stream_buffer,
                '\0',   // null-char is a delimiter
                b::bind(&Connection::ReadHandler, shared_from_this(),
                    ba::placeholders::error,
                    ba::placeholders::bytes_transferred));
        }
        void AsyncWriteString(const std::string &s) {
            std::cout << __PRETTY_FUNCTION__ << "\n";
    
            message = s;
    
            ba::async_write(
                sock,
                ba::buffer(message.c_str(), message.size()+1),
                b::bind(&Connection::WriteHandler, shared_from_this(),
                    ba::placeholders::error,
                    ba::placeholders::bytes_transferred));
        }
        std::string ExtractString() {
            std::cout << __PRETTY_FUNCTION__ << "\n";
    
            std::istream is(&stream_buffer);
            std::string s;
            std::getline(is, s, '\0');
            return s;
        }
        void ReadHandler(
            const bs::error_code &ec,
            std::size_t bytes_transferred) 
        {
            std::cout << __PRETTY_FUNCTION__ << "\n";
    
            if (!ec) {
                std::cout << (ExtractString() + "\n");
                std::cout.flush();
                AsyncReadString();  // read again
            }
            else {
                // do nothing, "this" will be deleted later
            }
        }
        void WriteHandler(const bs::error_code &ec, std::size_t bytes_transferred) {
            std::cout << __PRETTY_FUNCTION__ << "\n";
        }
    public:
        Connection(ba::io_service& svc) : sock(svc) { }
    
        virtual ~Connection() {
            std::cout << __PRETTY_FUNCTION__ << "\n";
        }
    
        socket_type& Socket() { return sock;          } 
        void Session()        { AsyncReadString();    } 
        void Stop()           { sock.cancel();        }
    };
    
    // a server also has its own io_service but it's only used for accepting
    class Server {
    public:
        std::list<boost::weak_ptr<Connection> > m_connections;
    protected:
        ba::io_service _service;
        boost::optional<ba::io_service::work> _work;
        acceptor_type _acc;
        b::thread thread;
    
        void AcceptHandler(const bs::error_code &ec, Connection::Ptr accepted) {
            if (!ec) {
                accepted->Session();
                DoAccept();
            }
            else {
                // do nothing the new session will be deleted automatically by the
                // destructor
            }
        }
    
        void DoAccept() {
            auto newaccept = boost::make_shared<Connection>(_service);
    
            _acc.async_accept(
                newaccept->Socket(),
                b::bind(&Server::AcceptHandler,
                    this,
                    ba::placeholders::error,
                    newaccept
                ));
        }
    
    public:
        Server():
            _service(),
            _work(ba::io_service::work(_service)),
            _acc(_service, ba::ip::tcp::endpoint(ba::ip::tcp::v4(), PORT)),
            thread(b::bind(&ba::io_service::run, &_service))
        {  }
    
        ~Server() {
            std::cout << __PRETTY_FUNCTION__ << "\n";
            Stop();
            _work.reset();
            if (thread.joinable()) thread.join();
        }
    
        void Start() {
            std::cout << __PRETTY_FUNCTION__ << "\n";
            DoAccept();
        }
    
        void Stop() {
            std::cout << __PRETTY_FUNCTION__ << "\n";
            _acc.cancel();
        }
    
        void StopAllConnections() {
            std::cout << __PRETTY_FUNCTION__ << "\n";
            for (auto c : m_connections) {
                if (auto p = c.lock())
                    p->Stop();
            }
        }
    };
    
    int main() {
        try {
            Server s;
            s.Start();
    
            std::cerr << "Shutdown in 2 seconds...\n";
            b::this_thread::sleep_for(b::chrono::seconds(2));
    
            std::cerr << "Stop accepting...\n";
            s.Stop();
    
            std::cerr << "Shutdown...\n";
            s.StopAllConnections(); // interrupt ongoing connections
        } // destructor of Server will join the service thread
        catch (std::exception &e) {
            std::cerr << __FUNCTION__ << ":" << __LINE__ << "\n";
            std::cerr << "Exception: " << e.what() << std::endl;
            return 1;
        }
    
        std::cerr << "Byebye\n";
    }
    

    我将main() 修改为运行 2 秒,无需用户干预。这样我就可以演示它了Live On Coliru(当然,它受限于客户端进程的数量)。

    如果您与很多(很多)客户一起运行它,例如使用

    $ time (for a in {1..1000}; do (sleep 1.$RANDOM; echo -e "hello world $RANDOM\\0" | netcat localhost 11235)& done; wait)
    

    你会发现两个第二个窗口都处理了它们:

    $ ./test | sort | uniq -c | sort -n | tail
    Shutdown in 2 seconds...
    Shutdown...
    Byebye
          2 hello world 28214
          2 hello world 4554
          2 hello world 6216
          2 hello world 7864
          2 hello world 9966
          2 void Server::Stop()
       1000 std::string Connection::ExtractString()
       1001 virtual Connection::~Connection()
       2000 void Connection::AsyncReadString()
       2000 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
    

    如果你真的发疯,将1000 提高到例如100000 那里,你会得到类似的东西:

    sehe@desktop:/tmp$ ./test | sort | uniq -c | sort -n | tail
    Shutdown in 2 seconds...
    Shutdown...
    Byebye
          2 hello world 5483
          2 hello world 579
          2 hello world 5865
          2 hello world 938
          2 void Server::Stop()
          3 hello world 9613
       1741 std::string Connection::ExtractString()
       1742 virtual Connection::~Connection()
       3482 void Connection::AsyncReadString()
       3482 void Connection::ReadHandler(const boost::system::error_code&, std::size_t)
    

    服务器重复运行 2 秒。

    【讨论】:

    • @jnbrq 我添加了一个使用 option 1. 的演示(单个线程上的单个 io 服务),它将异步处理数千个并发请求,直到停止从主线程。我希望这个示例有助于演示一些 Asio 习语
    • 直播录制在这里:starting ~18:15 (experiment)(以及剩余部分here
    • 您的AsyncWriteString() 函数创建一个局部变量,然后将其传递给async_write()。该字符串在函数退出时被销毁,但缓冲区需要在调用写入处理程序之前有效。这是一个错误。此外,无需添加空字符 -- 只需使用 s.c_str()s.size() + 1,因为 std::string::c_str() 为您提供了额外的空字符。
    • @jnbrq 每个连接 1 个线程是个坏主意,我不会演示它。 (您认为它将如何处理这 1000 个连接?)您可以要求任何选项,但实际上您不应该使事情复杂化,除非您必须这样做。你认为你为什么需要这个?
    • @jnbrq 弱点主要是为了向您展示您可以自行清理连接。老实说,我不知道你为什么甚至需要连接列表。您会注意到,如果不这样做,您也可以轻松地停止工作。 (IOW 这只是 Asio 中惯用会话处理的另一面)
    猜你喜欢
    • 2011-05-03
    • 2011-07-05
    • 1970-01-01
    • 1970-01-01
    • 2017-02-10
    • 1970-01-01
    • 2011-05-04
    • 1970-01-01
    • 2011-06-10
    相关资源
    最近更新 更多