我在尝试将使用套接字的 java 应用程序复制到使用 Netty 的更复杂方法时遇到了这个问题。
我解决问题的方法是了解建立连接所需的 netty 库的各种元素:
这 3 个元素可确保创建和管理连接以供进一步处理。
此外,使用 Netty 时还需要一些其他元素:
通道初始化器负责准备Pipeline,它本质上是通过一系列“过滤器”来传递入站和出站数据,以便处理不同级别的数据,每个级别接收前一个处理的数据编码器/解码器。
这是 netty 文档中介绍的管道的工作方式:
输入/输出请求
通过频道或
ChannelHandlerContext
|
+-------------------------------------------------- --+----------------+
|管道管道 | |
| \|/ |
| +---------+ +------------+----------+ |
| |入站处理程序 N | |出站处理程序 1 | |
| +----------+----------+ +------------+---------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +------------+---------+ |
| |入站处理程序 N-1 | |出站处理程序 2 | |
| +----------+----------+ +------------+---------+ |
| /|\ 。 |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [方法调用] [方法调用] |
| . . |
| . \|/ |
| +----------+----------+ +------------+---------+ |
| |入站处理程序 2 | |出站处理程序 M-1 | |
| +----------+----------+ +------------+---------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +------------+---------+ |
| |入站处理程序 1 | |出站处理程序 M | |
| +----------+----------+ +------------+---------+ |
| /|\ | |
+---------------+--------------------------------- --+----------------+
| \|/
+---------------+--------------------------------- --+----------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty 内部 I/O 线程(传输实现) |
+-------------------------------------------------- ------------------+
就问题的原始上下文而言,没有预设的解码器允许解析具有预定字节的自定义数据。本质上,这意味着必须为入站数据创建自定义解码器。
让我们首先回顾一下作为客户端应用程序启动的连接的基础知识:
导入 io.netty.bootstrap.Bootstrap
导入 io.netty.channel.nio.NioEventLoopGroup
导入 io.netty.channel.socket.nio.NioSocketChannel
导入 io.netty.channel.socket.SocketChannel
对象应用 {
def 主(参数:数组 [字符串]){
连接()
}
定义连接(){
val host = "host.example.com"
验证端口 = 9999
val group = new NioEventLoopGroup() // 启动事件循环组
尝试 {
var b = new Bootstrap() // 创建网络引导程序
.group(group) // 将 NioEventLoopGroup 关联到引导程序
.channel(classOf[NioSocketChannel]) // 将通道关联到引导程序
.handler(MyChannelInitializer) // 提供处理通道上的传入/传出数据的处理程序
var ch = b.connect(host, port).sync().channel() //发起到服务器的连接并链接到netty通道
ch.writeAndFlush("SERVICE_REQUEST") // 向服务器发送请求
ch.closeFuture().sync() // 保持连接处于活动状态,而不是在收到第一个数据包后关闭通道
}
抓住 {
case t: Throwable => t.printStackTrace(); group.shutdownGracefully()
}
最后 {
group.shutdownGracefully() // 关闭事件组
}
}
}
在启动引导时调用MyChannelInitializer 是负责告诉程序如何处理传入和传出数据消息的部分:
导入 io.netty.channel.ChannelInitializer
导入 io.netty.channel.socket.SocketChannel
导入 io.netty.handler.codec.string.StringEncoder
对象 MyChannelInitializer 扩展 ChannelInitializer[SocketChannel] {
val STR_ENCODER = new StringEncoder // 来自 netty 的通用 StringEecoder,用于简单地准备字符串并将其发送到服务器
def initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline() // 加载与通道关联的管道
// 解码消息
pipeline.addLast("packet-decoder",MyPacketDecoder) // 第一个数据“过滤器”为第二个过滤器提取必要的字节
pipeline.addLast("gzip-inflater", MyGZipDecoder) // 第二个“过滤器”解压内容
// 编码要发送的字符串
pipeline.addLast("command-encoder",STR_ENCODER) // 输出数据的字符串编码器
// 处理程序
pipeline.addLast("message-handler", MyMessageHandler) // 在所有“过滤器”都被应用后处理结束数据
}
}
在这种情况下,第一个管道项MyPacketDecoder 已创建为ReplayingDecoder 的子类,它提供了一种执行数据包重构的简单方法,以便获得消息使用所需的所有字节。 (简单地说,等待所有字节都收集到 ByteBuf 变量中,然后再继续)
了解ByteBuf 的工作原理对于此类应用程序非常重要,尤其是read 和get 方法之间的区别,它们分别允许读取和移动读取索引或仅读取数据而不影响读取器索引。
下面提供MyPacketDecoder的示例
导入 io.netty.handler.codec.ReplayingDecoder
导入 io.netty.channel.ChannelHandlerContext
导入 io.netty.buffer.ByteBuf
导入 java.util.List
对象 MyPacketDecoder 扩展 ReplayingDecoder[Int] {
val READ_HEADER = 0
val READ_CONTENT = 1
super.state(READ_HEADER) // 通过调用超类构造函数设置解码器的初始状态
var blockSize:Int = 0 // 预期的数据大小,由从服务器接收到的数据发布,会根据您的情况而有所不同,在实际要处理的数据之前可能会有额外的头字节
def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {
var received_size = in.readableBytes()
如果(状态()== READ_HEADER){
blockSize = in.readInt() // 标头数据,如果分段,则在当前和后续数据包中接收到的预期数据大小
checkpoint(READ_CONTENT) // 更改对象的状态以继续获取消息有效所需的所有必需字节
}
否则如果(状态()== READ_CONTENT){
var bytes = new Array[Byte](blockSize)
in.getBytes(0,bytes,0,blockSize) // 将收集到的字节添加到 by 数组中,以获得由 blockSize 变量定义的预期大小
var frame = in.readBytes(blockSize) // 创建要传递给下一个“过滤器”的 bytebuf
checkpoint(READ_HEADER) // 改变准备下一条消息的状态
out.add(frame) // 将数据传递给下一个“过滤器”
}
别的 {
throw new Error("Case notcovered Exception")
}
}
}
前面的代码从所有数据包中取出接收到的字节,直到达到预期的字节大小,并将其传递到下一个管道级别。
下一个管道级别处理接收数据的 GZIP 解压缩。这是由MyGZipDecoder 对象确保的,该对象被定义为ByteToMessageDecoder 抽象对象的子类,以便将字节信息作为接收数据处理:
导入 io.netty.handler.codec.ByteToMessageDecoder
导入 io.netty.channel.ChannelHandlerContext
导入 io.netty.buffer.ByteBuf
导入 java.net._
导入 java.io._
导入 java.util._
导入 java.util.zip._
导入 java.text._
对象 MyGZipDecoder 扩展 ByteToMessageDecoder {
val MAX_DATA_SIZE = 100000
var inflater = new Inflater(true)
var compressedData = new Array[Byte](MAX_DATA_SIZE)
var uncompressedData = new Array[Byte](MAX_DATA_SIZE)
def decode(ctx: ChannelHandlerContext,in: ByteBuf,out: List[AnyRef]): Unit = {
var received_size = in.readableBytes() // 读取可用字节数
in.readBytes(compressedData, 0, received_size) // 将字节放入 Byte 数组
充气机.reset();
inflater.setInput(compressedData, 0, received_size) // 为数据解压准备充气机
var resultLength = inflater.inflate(uncompressedData) // 将数据解压到 uncompressedData 字节数组中
var message = new String(uncompressedData) // 从未压缩的数据中生成一个字符串
out.add(message) // 将数据传递到下一个管道级别
}
}
此解码器将数据包中接收到的压缩数据解压缩,并将数据作为从该级别接收到的解码字节获得的字符串发送到下一个级别。
最后一块拼图是MyMessageHandler 对象,它基本上对数据进行最终处理,以满足应用程序所需的目的。这是SimpleChannelInboundHandler 的子类,带有一个字符串参数,预期作为通道数据的消息:
导入 io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
导入 io.netty.channel.ChannelHandler.Shareable
@可共享
对象 QMMessageHandler 扩展 SimpleChannelInboundHandler[String] {
def channelRead0(ctx: ChannelHandlerContext, msg: String) {
println("Handler => 收到消息:"+msg)
// 在这里做你的数据处理,但是你需要用于应用程序的目的
}
}
这基本上完成了这个特定示例的要求,该示例连接到以专有数据协议提供数据的服务器,对基本数据包数据使用 GZip 压缩。
希望这可以为那些试图实现类似场景的人提供一个良好的基础,但主要想法是需要进行一些定制才能为专有协议创建适合的处理。
此外,需要注意的是,这种类型的实现并不是真正用于简单的客户端-服务器连接,而是用于需要数据的可分布性/可扩展性的应用程序,这些应用程序由 netty 库提供(即许多同时并发连接并广播数据)。
对于我在编写此答案时可能遗漏的任何错误,我提前道歉。
我希望这个简短的教程可以对其他人有所帮助,因为我个人不得不花费一些令人沮丧的时间从网络上的点点滴滴中弄清楚。