【问题标题】:What is a good way to handle multithreading with Poco SocketReactor?使用 Poco SocketReactor 处理多线程的好方法是什么?
【发布时间】:2012-01-12 01:20:53
【问题描述】:

所以我开始对实现大容量客户端/服务器系统的替代方案进行一些研究,我目前正在研究 Poco 的 Reactor 框架,因为我现在将 Poco 用于我的很多应用程序框架。

传入的数据包大小会非常小,所以我认为从客户端读取数据的角度来看它可以正常工作。但是将基于客户端输入执行的操作将相对昂贵,并且可能需要卸载到另一个进程甚至另一个服务器。发送回客户端的响应有时会相当大。所以很明显我不能在发生这种情况时阻塞反应器线程。

所以我在想,如果我只是读取反应器事件处理程序中的数据,然后将其传递给另一个处理数据的线程(池),效果会更好。

我不太确定的是在操作完成后将响应发送回客户端的过程。

我找不到太多关于使用该框架的最佳方法的信息。但是我做了一些测试,看起来反应堆会在套接字可写时重复触发 WritableNotification 事件。那么最优的流程是不是把需要发送的数据放到接收WritableNotification事件的对象中排队,每次收到事件时发送小块呢?

更新:所以当我开始测试这个时,我惊恐地发现服务器应用程序在单个连接上运行的 CPU 上的服务器 CPU 使用率高达 100%。但经过一番挖掘后,我发现我做错了什么。我发现在创建服务处理程序时我不需要注册 WritableNotification 事件,我只需要在有数据要发送时注册。然后,一旦发送了所有数据,我应该取消注册事件处理程序。这样,当没有要发送的内容时,反应器就不必一遍又一遍地调用事件处理程序。现在,即使有 100 个连接,我的 CPU 使用率也接近于 0。呼!

【问题讨论】:

  • 我知道这是题外话,但 boost::asio 确实是多线程服务器的良好基础。它有一个非常直接的工作方式,并允许您大大简化您的代码(我采用的方法,一个运行大量线程的单个 io_service,对我在其上运行的一些测试做出了很好的响应)。此外,文档非常好,而且有很多人知道他们的 boost 方法。
  • 谢谢,朱利安。实际上,我对 Boost.Asio 很熟悉,并且在一些项目中使用过它,但不是用于大容量服务器。我只是想避免在这个项目中引入令人头疼的 Boost 构建过程,而且我已经在使用 Poco,所以我正在尝试一下。如果这以某种方式失败,Asio 是下一个选择。我已经遇到了一些问题,因此可能很快就会发生。

标签: c++ reactor poco-libraries


【解决方案1】:

我写了一个从 SocketConnector 复制的 ServerConnector 类,但不要为 socket 调用 connect,因为 socket 已经连接,如果在 TcpServerConnection 的 run() 函数中使用 ServiceHandler 启动了一个用于通知的反应器,则类 TcpServer 将启动一个新线程。所以,我得到了 reactor-partten 的多线程,但我不知道这是不是最好的方法。

类 ServerConnector

template <class ServiceHandler>
class ServerConnector
{
public:     
    explicit ServerConnector(StreamSocket& ss):
        _pReactor(0),
        _socket(ss)
        /// Creates a ServerConnector, using the given Socket.
    {
    }

    ServerConnector(StreamSocket& ss, SocketReactor& reactor):
        _pReactor(0),
        _socket(ss)
        /// Creates an acceptor, using the given ServerSocket.
        /// The ServerConnector registers itself with the given SocketReactor.
    {
        registerConnector(reactor);
        onConnect();
    }

    virtual ~ServerConnector()
        /// Destroys the ServerConnector.
    {
        unregisterConnector();
    }

//
// this part is same with SocketConnector
//

private:
    ServerConnector();
    ServerConnector(const ServerConnector&);
    ServerConnector& operator = (const ServerConnector&);

    StreamSocket&   _socket;
    SocketReactor* _pReactor;
};

Echo-Service 是一个普通的 ServiceHander

class EchoServiceHandler
{
public:
    EchoServiceHandler(StreamSocket& socket, SocketReactor& reactor):
        _socket(socket),
        _reactor(reactor)
    {
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
        _reactor.addEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
    }

    ~EchoServiceHandler()
    {
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ErrorNotification>(*this, &EchoServiceHandler::onError));
        _reactor.removeEventHandler(_socket, Observer<EchoServiceHandler, ReadableNotification>(*this, &EchoServiceHandler::onReadable));
    }

    void onReadable(ReadableNotification* pNf)
    {
        pNf->release();
        char buffer[4096];
        try {
            int n = _socket.receiveBytes(buffer, sizeof(buffer));
            if (n > 0)
            {
                _socket.sendBytes(buffer, n);
            } else
                onError();
        } catch( ... ) {
            onError();
        }
    }

    void onError(ErrorNotification* pNf)
    {
        pNf->release();
        onError();
    }

    void onError()
    {
        _socket.shutdown();
        _socket.close();
        _reactor.stop();
        delete this;
    }

private:
    StreamSocket   _socket;
    SocketReactor& _reactor;
};

EchoReactorConnection 与类 TcpServer 一起将反应器作为线程运行

class EchoReactorConnection: public TCPServerConnection
{
public:
    EchoReactorConnection(const StreamSocket& s): TCPServerConnection(s)
    {
    }

    void run()
    {
        StreamSocket& ss = socket();
        SocketReactor reactor;

        ServerConnector<EchoServiceHandler> sc(ss, reactor);
        reactor.run();
        std::cout << "exit EchoReactorConnection thread" << std::endl;
    }
};

cppunit 测试用例与 TCPServerTest::testMultiConnections 相同,但使用 EchoReactorConnection 进行多线程。

void TCPServerTest::testMultithreadReactor()
{
    ServerSocket svs(0);
    TCPServerParams* pParams = new TCPServerParams;
    pParams->setMaxThreads(4);
    pParams->setMaxQueued(4);
    pParams->setThreadIdleTime(100);

    TCPServer srv(new TCPServerConnectionFactoryImpl<EchoReactorConnection>(), svs, pParams);
    srv.start();

    assert (srv.currentConnections() == 0);
    assert (srv.currentThreads() == 0);
    assert (srv.queuedConnections() == 0);
    assert (srv.totalConnections() == 0);

    //
    // same with TCPServerTest::testMultiConnections()
    //
    // ....
    ///
}

【讨论】:

    猜你喜欢
    • 2013-01-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-21
    • 1970-01-01
    • 2011-02-21
    • 1970-01-01
    相关资源
    最近更新 更多