【问题标题】:Get OOM exception when sending message with a high speed with netty使用netty高速发送消息时出现OOM异常
【发布时间】:2014-02-01 15:17:22
【问题描述】:

我用netty编写了一个客户端,以便以高速率发送消息。 通过 jConsole,我看到“老一代”正在增加,最后它抛出 java.lang.OutOfMemoryError:超出 GC 开销限制。 是否有一些方法或配置可以避免此异常 以下是我的测试代码:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;

    import java.io.IOException;
    import java.net.UnknownHostException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeoutException;

    public class TestNettyTcp {
        private EventLoopGroup group;

        private Channel ch;

        /**
         * @param args
         * @throws SyslogSenderException
         * @throws TimeoutException
         * @throws ExecutionException
         * @throws IOException
         * @throws InterruptedException
         * @throws UnknownHostException
         */
        public static void main( String[] args )
            throws UnknownHostException, InterruptedException, IOException,  ExecutionException,  TimeoutException {
            new TestNettyTcp().testSendMessage();
        }

        public TestNettyTcp()
            throws InterruptedException {
            group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group( group ).channel( NioSocketChannel.class )
                // .option( ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024 )
                .option( ChannelOption.SO_RCVBUF, 1048576 ).option( ChannelOption.SO_SNDBUF, 1048576 )
                .option( ChannelOption.TCP_NODELAY, true ).handler( new NettyTcpSyslogSenderInitializer() );
            // Connect to a server.
            ChannelFuture future = b.connect( "192.168.22.70", 514 );
            future.awaitUninterruptibly();
            // Now we are sure the future is completed.
            assert future.isDone();

            if ( !future.isSuccess() ) {
                future.cause().printStackTrace();
            }
            else {
                ch = future.sync().channel();
            }
        }

        public void testSendMessage()
            throws InterruptedException, UnknownHostException, IOException, ExecutionException, TimeoutException {

            ThreadGroup threadGroup = new ThreadGroup( "SendMessage" );
            for ( int k = 0; k < 10; k++ ) {
                Thread thread = new Thread( threadGroup, new Runnable() {

                    @Override
                    public void run() {

                        String payLoad = "key=\"value\" key2=\"value2\" key3=\"value3\" Count:";

                        try {
                            for ( int j = 0; j < 100; j++ ) {
                                long a = System.currentTimeMillis();
                                for ( int i = 0; i < 20000; i++ ) {

                                    ch.writeAndFlush( payLoad + j + "_" + i + "\n" );
                                }
                                System.out.println( "\r<br>Excuted time : " + ( System.currentTimeMillis() - a ) / 1000f
                                    + "seconde" );
                            }

                        }
                        catch ( InterruptedException e ) {
                            e.printStackTrace();
                        }
                        finally {
                            if ( ch != null ) {
                                ch.close();
                            }
                        }
                    }

                } );
                thread.start();
            }

            while ( threadGroup.activeCount() > 0 ) {
                try {
                    Thread.sleep( 1000 );
                }
                catch ( InterruptedException e ) {
                    e.printStackTrace();
                }
            }
        }
    }

    class NettyTcpSyslogSenderInitializer
        extends ChannelInitializer<SocketChannel> {

        public NettyTcpSyslogSenderInitializer() {
            super();
        }

        @Override
        public void initChannel( SocketChannel ch )
            throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast( new StringEncoder() );
        }
    }

代码可以快速重现问题

【问题讨论】:

    标签: java out-of-memory netty


    【解决方案1】:

    您的编写速度比网络堆栈可以处理的要快。请注意,这都是异步的……一旦 Channel.isWritable() 返回 false 就停止写入,一旦再次返回 true 就恢复。您可以通过覆盖 ChannelInboundHandler 中的 channelWritabilityChanged(...) 方法来通知此更改。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-09-09
      • 2012-10-12
      • 2017-12-06
      • 1970-01-01
      • 1970-01-01
      • 2016-04-14
      • 2014-04-02
      相关资源
      最近更新 更多