【问题标题】:Running many Netty clients and a server on the same development machine在同一台开发机器上运行多个 Netty 客户端和服务器
【发布时间】:2017-12-13 08:39:28
【问题描述】:

我正在编写一个应用程序,其中客户端和服务器都是使用 Netty 编写的,并且服务器应该(显然)一次支持多个客户端。我试图通过创建 1000 个客户端共享一个 EventLoopGroup 并在单台机器上运行所有内容来测试它。

最初我有多个客户端有时由于超时而无法连接。在客户端增加SO_TIMEOUT_MILLIS 并在服务器上将SO_BACKLOG 设置为numberOfClients 修复了this 问题。但是,我仍然得到connection reset by peer

io.netty.channel.AbstractChannel$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: localhost/127.0.0.1:8080
    at io.netty.channel.unix.Socket.finishConnect(..)(Unknown Source)
Caused by: io.netty.channel.unix.Errors$NativeConnectException: syscall:getsockopt(..) failed: Connection refused
    ... 1 more

有时在客户端(尤其是当我增加客户端数量时)。服务器端LoggingHandler 的输出似乎没有显示任何尝试从这些通道绑定到客户端的端口进行连接。尝试使用 Nio* 而不是 Epoll* 类型也没有帮助。

是否需要设置其他选项以允许更多连接(可能在服务器端,如果它真的是拒绝/重置连接的那个)?

为简化情况,我删除了自己的逻辑,因此客户端只需通过 websocket 连接并在握手成功后关闭通道。 据我了解,Netty 在处理 10000 个并发 websocket 连接时通常不会有问题,而这些连接并没有多大作用。

ulimit -n 是 1000000,ulimit -u 是 772794,所以两者都不是问题。

这是代码(在 Kotlin 中,但 Java 翻译应该清楚):

package netty

import io.netty.bootstrap.Bootstrap
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketVersion
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.junit.Test
import java.net.URI

@Suppress("OverridingDeprecatedMember")
class NettyTest {
    private fun channelInitializer(f: (Channel) -> Unit) = object : ChannelInitializer<Channel>() {
        override fun initChannel(ch: Channel) {
            f(ch)
        }
    }

    private val numberOfClients = 10000
    private val maxHttpContentLength = 65536

    @Test
    fun manyClients() {
        // set up server
        val bossLoopGroup = EpollEventLoopGroup(1)
        val workerLoopGroup = EpollEventLoopGroup()
        val serverChannelFactory = ChannelFactory { EpollServerSocketChannel() }
        val clientLoopGroup = EpollEventLoopGroup()
        val clientChannelFactory = ChannelFactory { EpollSocketChannel() }
        val serverChannel = ServerBootstrap().channelFactory(serverChannelFactory).group(bossLoopGroup, workerLoopGroup).handler(LoggingHandler(LogLevel.DEBUG)).childHandler(channelInitializer {
            it.pipeline().addLast(
                    HttpServerCodec(),
                    HttpObjectAggregator(maxHttpContentLength),
                    WebSocketServerCompressionHandler(),
                    WebSocketServerProtocolHandler("/", null, true, maxHttpContentLength)/*,
                    myServerHandler*/
            )
        }).option(ChannelOption.SO_BACKLOG, numberOfClients).bind("localhost", 8080).sync().channel()
        println("Server started")

        try {
            // set up clients    
            val url = URI("ws://localhost")
            val futures = List(numberOfClients) { clientIndex ->
                val handshaker = WebSocketClientHandshakerFactory.newHandshaker(url, WebSocketVersion.V13, null, true, null)
                val promise = clientLoopGroup.next().newPromise<Channel>()

                val connectFuture = Bootstrap().channelFactory(clientChannelFactory).group(clientLoopGroup).handler(channelInitializer {
                    it.pipeline().addLast(
                            HttpClientCodec(),
                            HttpObjectAggregator(maxHttpContentLength),
                            WebSocketClientCompressionHandler.INSTANCE,
                            WebSocketClientProtocolHandler(handshaker, true),
                            object : ChannelInboundHandlerAdapter() {
                                override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
                                    if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                                        promise.setSuccess(ctx.channel())
                                        println("Client $clientIndex handshake successful")
                                    }
                                }

                                override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
                                    promise.setFailure(cause)
                                }
                            })
                }).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 120000).connect("localhost", 8080)
                Pair(promise, connectFuture)
            }
            for ((promise, connectFuture) in futures) {
                connectFuture.sync()
                try {
                    promise.sync()
                } finally { connectFuture.channel().close().sync() }
            }
        } finally {
            try { serverChannel.close().sync() } finally {
                workerLoopGroup.shutdownGracefully()
                bossLoopGroup.shutdownGracefully()
                clientLoopGroup.shutdownGracefully()
            }
        }
    }
}

【问题讨论】:

  • 非常有趣。据我所知,所有网络套接字握手目前都是由 IO 线程执行的。您是否尝试过将所有握手从 IO 线程中移出并将它们添加到另一个 EventLoopGroup
  • 实际上执行握手必须在IO线程中完成,不是吗?创建请求和响应可能会被拉出,这意味着无法使用WebSocket*ProtocolHandler(尽管其中的大部分代码都可以重用)。
  • 同意,没有足够重视这个问题。顺便说一句,您找到解决方案了吗?问题到底出在哪里?
  • 还没有,很遗憾。
  • 嗨,Alexey,你有解决这个问题的办法吗?

标签: netty


【解决方案1】:

只有 1 个线程用于接受传入连接:bossLoopGroup = EpollEventLoopGroup(1)。也许这不足以接受客户端连接群。

我建议共享一个EventLoopGroup 作为老板、工人和客户,使用默认线程数(Netty 会考虑内核数)。所以你不会有未充分使用/过度使用的线程池。

如果您想使用不同的线程池运行测试,请使用明确的大小创建它们,并为您的bossLoopGroup 设置超过 1 个线程。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-02-10
    • 2023-03-18
    • 1970-01-01
    • 1970-01-01
    • 2021-07-31
    • 2017-06-01
    • 1970-01-01
    • 2017-05-26
    相关资源
    最近更新 更多