【问题标题】:Netty TCP Socket InputStreamNetty TCP 套接字输入流
【发布时间】:2017-11-18 15:12:18
【问题描述】:

Netty TCP 服务器运行在端口8000 接收NMEA 格式数据。它使用Marine API 库将乱码转换为需要来自套接字的输入流的有意义的信息。

SentenceReader sentenceReader = new SentenceReader(socket.getInputStream());
sentenceReader.addSentenceListener(new MultiSentenceListener());
sentenceReader.start();

如何获取正在使用的 netty 服务器端口的输入流?

【问题讨论】:

    标签: java sockets spring-boot tcp netty


    【解决方案1】:

    SentenceReader 没有任何方法来接受“流入”数据,但是通过子类化,它可以接受数据。

    SentenceReader 的核心使用DataReader 来存储它的数据,通常这个数据读取器是从一个单独的线程SentenceReader 本身轮询的,我们可以修改这个结构来获得我们需要的东西。

    首先,我们用我们自己的类继承SentenceReader,给它适当的构造函数和我们想要的方法,并去掉启动和停止方法的影响。我们现在提供null作为文件(希望以后的版本提供直接传入datareader的方法)

    public class NettySentenceReader extends SentenceReader {
        public NettySentenceReader () {
            super((InputStream)null);
        }
    
        @Override
        public void start() {
        }
    
        @Override
        public void stop() {
        }
    }
    

    我们现在需要在我们自己的 Netty 处理程序中实现内部类 DataReader 的所有功能,以复制相同的行为

    public class SentenceReaderHandler extends
             SimpleChannelInboundHandler<String> {
        private SentenceFactory factory;
        private SentenceReader parent;
    
        public SentenceReaderHandler (SentenceReader parent) {
            this.parent = parent;
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            if(!ctx.channel().isActive())
                return;
            //ActivityMonitor monitor = new ActivityMonitor(parent);
            this.factory = SentenceFactory.getInstance();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            //ActivityMonitor monitor = new ActivityMonitor(parent);
            this.factory = SentenceFactory.getInstance();
        }
    
        @Override
        // This method will be renamed to `messageReceived` in Netty 5.0.0
        protected void channelRead0(ChannelHandlerContext ctx, String data)
                 throws Exception {
            if (SentenceValidator.isValid(data)) {
                monitor.refresh();
                Sentence s = factory.createParser(data);
                parent.fireSentenceEvent(s);
            } else if (!SentenceValidator.isSentence(data)) {
                parent.fireDataEvent(data);
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) {
            //monitor.reset();
            parent.fireReadingStopped();
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) {
            if(!ctx.channel().isActive())
                return;
            //monitor.reset();
            parent.fireReadingStopped();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
            parent.handleException("Data read failed", e);
        }
    }
    

    最后,我们需要将其集成到 Netty 管道中:

    SentenceReader reader = new NettySentenceReader();
    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
        private static final StringDecoder DECODER = new StringDecoder();
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast(DECODER);
            pipeline.addLast(new SentenceReaderHandler(reader)); 
        }
    });
    

    【讨论】:

    • if(!ctx.isActive()) 方法isActive() 不存在
    • fireSentenceEvent(net.sf.marineapi.nmea.sentence.Sentence)' 在 'net.sf.marineapi.nmea.io.SentenceReader' 中不公开。无法从外部包访问
    • 如果那个方法不可访问,我们需要使用反射来强制访问那个方法,但是我今天没时间解释,明天再做
    【解决方案2】:

    你不能轻易做到,因为 InputStream 是阻塞的,而 netty 是一个异步 - 非阻塞 API。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-09-16
      • 1970-01-01
      • 1970-01-01
      • 2011-02-14
      • 1970-01-01
      • 2013-05-25
      相关资源
      最近更新 更多