转:https://blog.csdn.net/qq_18603599/article/details/80768409
上一章节主要介绍了netty的自带编解码使用和如何使用自定义的编解码完成业务,今天将要介绍netty关于协议相关知识,主要内容如下:
1 netty基于http协议的使用:完成文件目录浏览功能
该场景不需要自己写客户端,只需要写服务端即可,然后通过http的get方法请求该服务端,服务端就会返回文件目录的层级解析和展示:直接看代码:服务端代码
package http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @Author 18011618
* @Description
* @Date 16:37 2018/6/25
* @Modify By
*/
public class HttpFileServer {
//浏览文件的根目录
private static final String DEFAULT_URL = "/";
public void run(final int port, final String url) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 请求消息解码器
ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
// 目的是将多个消息转换为单一的request或者response对象
ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
// 响应解码器
ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
// 目的是支持异步大文件传输()
ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 业务逻辑
ch.pipeline().addLast("fileServerHandler", new HttpFileServerHandler(url));
}
});
ChannelFuture future = b.bind("127.0.0.1", port).sync();
System.out.println("HTTP文件目录服务器启动,网址是 : " + "http://127.0.0.1:" + port + url);
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
String url = DEFAULT_URL;
if (args.length > 1)
url = args[1];
new HttpFileServer().run(port, url);
}
}
上面红色部分标准就是基于HTTP协议的实现,下面再看一下IO线程:
package http;
import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaders.Names.LOCATION;
import static io.netty.handler.codec.http.HttpMethod.GET;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
import static io.netty.handler.codec.http.HttpResponseStatus.FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelProgressiveFuture;
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.util.regex.Pattern;
import javax.activation.MimetypesFileTypeMap;
/**
* @Author 18011618
* @Description 完成文件目录浏览处理的handler
* @Date 16:36 2018/6/25
* @Modify By
*/
public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String url;
public HttpFileServerHandler(String url) {
this.url = url;
}
/**
* 请求解析文件目录层级以及对应的内容
* @param ctx
* @param request
* @throws Exception
*/
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
//是否可以解码
if (!request.getDecoderResult().isSuccess()) {
sendError(ctx, BAD_REQUEST);
return;
}
//请求的方法是否是get
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
final String uri = request.getUri();
final String path = sanitizeUri(uri);
if (path == null) {
sendError(ctx, FORBIDDEN);
return;
}
File file = new File(path);
if (file.isHidden() || !file.exists()) {
sendError(ctx, NOT_FOUND);
return;
}
if (file.isDirectory()) {
if (uri.endsWith("/")) {
sendListing(ctx, file);
} else {
sendRedirect(ctx, uri + '/');
}
return;
}
if (!file.isFile()) {
sendError(ctx, FORBIDDEN);
return;
}
//打开文件系统
RandomAccessFile randomAccessFile = null;
try {
//以只读的方式打开文件
randomAccessFile = new RandomAccessFile(file, "r");
} catch (FileNotFoundException fnfe) {
sendError(ctx, NOT_FOUND);
return;
}
//获取文件长度
long fileLength = randomAccessFile.length();
//创建响应
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
setContentLength(response, fileLength);
setContentTypeHeader(response, file);
if (isKeepAlive(request)) {
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
ChannelFuture sendFileFuture;
sendFileFuture = ctx.write(new ChunkedFile(randomAccessFile, 0, fileLength, 8192), ctx.newProgressivePromise());
//监听用户浏览目录事件
sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) {
if (total < 0) { // total unknown
System.err.println("Transfer progress: " + progress);
} else {
System.err.println("Transfer progress: " + progress + " / " + total);
}
}
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
System.out.println("Transfer complete.");
}
});
ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!isKeepAlive(request)) {
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 请求出现异常 关闭链路
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
private String sanitizeUri(String uri) {
try {
uri = URLDecoder.decode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
try {
uri = URLDecoder.decode(uri, "ISO-8859-1");
} catch (UnsupportedEncodingException e1) {
throw new Error();
}
}
if (!uri.startsWith(url)) {
return null;
}
if (!uri.startsWith("/")) {
return null;
}
uri = uri.replace('/', File.separatorChar);
if (uri.contains(File.separator + '.') || uri.contains('.' + File.separator) || uri.startsWith(".") || uri.endsWith(".") || INSECURE_URI.matcher(uri).matches()) {
return null;
}
return System.getProperty("user.dir") + File.separator + uri;
}
private static final Pattern ALLOWED_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");
private static void sendListing(ChannelHandlerContext ctx, File dir) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK);
response.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8");
StringBuilder buf = new StringBuilder();
String dirPath = dir.getPath();
buf.append("<!DOCTYPE html>\r\n");
buf.append("<html><head><title>");
buf.append(dirPath);
buf.append(" 目录:");
buf.append("</title></head><body>\r\n");
buf.append("<h3>");
buf.append(dirPath).append(" 目录:");
buf.append("</h3>\r\n");
buf.append("<ul>");
buf.append("<li>链接:<a href=\"../\">..</a></li>\r\n");
for (File f : dir.listFiles()) {
if (f.isHidden() || !f.canRead()) {
continue;
}
String name = f.getName();
if (!ALLOWED_FILE_NAME.matcher(name).matches()) {
continue;
}
buf.append("<li>链接:<a href=\"");
buf.append(name);
buf.append("\">");
buf.append(name);
buf.append("</a></li>\r\n");
}
buf.append("</ul></body></html>\r\n");
ByteBuf buffer = Unpooled.copiedBuffer(buf, CharsetUtil.UTF_8);
response.content().writeBytes(buffer);
buffer.release();
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* 重定向
* @param ctx
* @param newUri
*/
private static void sendRedirect(ChannelHandlerContext ctx, String newUri) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, FOUND);
response.headers().set(LOCATION, newUri);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* 设置HTTP请求HEAD
* @param response
* @param file
*/
private static void setContentTypeHeader(HttpResponse response, File file) {
MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();
response.headers().set(CONTENT_TYPE, mimeTypesMap.getContentType(file.getPath()));
}
}
OK 这个时候直接运行一下服务端:会输出一个URL
点击URL之后就能显示文件浏览目录层次
其中每一个超链接可以再点击:
一个简单的文件浏览器功能就实现了...
2 netty基于udp协议的使用:是一种基于UDP的传输协议
业务场景:客户端发送给服务端'我要写诗',服务端接受到客户端的请求,随机在已有的诗句中,返回一句给'客户端',直接看代码
首先看client相关的:
package udp.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
* @Author 18011618
* @Description UDP处理的handlers
* @Date 17:00 2018/6/25
* @Modify By
*/
public class UDPClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
//数据包转换为字符串
String response = msg.content().toString(CharsetUtil.UTF_8);
if (response.startsWith("我要写诗: ")) {
System.out.println(response);
ctx.close();
}
}
}
和客户端:
package udp.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.util.CharsetUtil;
import java.net.InetSocketAddress;
/**
* @Author 18011618
* @Description 模拟UDP客户端发送请求
* @Date 17:00 2018/6/25
* @Modify By
*/
public class UDPClient {
public void run(int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//设置为UDP处理的channel
b.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new UDPClientHandler());//设置业务处理的handler
Channel ch = b.bind(0).sync().channel();
// 向网段内的所有机器广播UDP消息
// DatagramPacket UPD传输包
ch.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer("我要写诗", CharsetUtil.UTF_8), new InetSocketAddress("255.255.255.255", port))).sync();
if (!ch.closeFuture().await(15000)) {
System.out.println("查询超时!");
}
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new UDPClient().run(port);
}
}
再看看服务端的代码:
package udp.server;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.ThreadLocalRandom;
/**
* @Author 18011618
* @Description UDP服务端处理的handler
* @Date 17:01 2018/6/25
* @Modify By
*/
public class UDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket>{
private static final String[] DICTIONARY = { "只要功夫深,铁棒磨成针。",
"旧时王谢堂前燕,飞入寻常百姓家。", "洛阳亲友如相问,一片冰心在玉壶。", "一寸光阴一寸金,寸金难买寸光阴。",
"老骥伏枥,志在千里。烈士暮年,壮心不已!" };
private String nextQuote() {
int quoteId = ThreadLocalRandom.current().nextInt(DICTIONARY.length);
return DICTIONARY[quoteId];
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
String req = packet.content().toString(CharsetUtil.UTF_8);
System.out.println(req);
if ("我要写诗".equals(req)) {
ctx.writeAndFlush(new DatagramPacket(Unpooled.copiedBuffer(
"我要写诗: " + nextQuote(), CharsetUtil.UTF_8), packet
.sender()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
cause.printStackTrace();
}
}
服务端:
package udp.server;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
/**
* @Author 18011618
* @Description UDP服务端接受客户端
* @Date 17:01 2018/6/25
* @Modify By
*/
public class UDPServer {
public void run(int port) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new UDPServerHandler());
b.bind(port).sync().channel().closeFuture().await();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new UDPServer().run(port);
}
}
看一下效果:
每次返回的都是不一样的.
3 基于netty完成自定义一种协议开发:客户端的请求和服务端的响应以及json协议进行传输
虽然netty提供常用的协议给开发者使用,但如果想要开发自己的协议是否可以呢?答案是肯定,现在很多RPC框架在数据传输都会使用json代替了传统的xml文件传输方式,既然这个传输协议用的很多,接下来就实现一个这样的协议开发:下面首先分析要实现这样功能的步骤
& 自定义request编码器
&自定义reqeust解码器
&自定义response编码器
&自定义response解码器
&自定义json数据转换工具类
&自定义request的handler处理逻辑
&自定义response的handler处理逻辑
&request处理的client
&response处理的server
上面基本是就是一些核心步骤,按照这个步骤一步步来实现即可:整体看一下项目结构:
下面看具体的代码实现:首先定义请求和响应的javabean:
package com.netty.http.json.bean.http;
import io.netty.handler.codec.http.FullHttpRequest;
/**
* Created by jack on 2018/5/4.
* 封装请求参数
*/
public class HttpJsonRequest {
private FullHttpRequest request;
private Object body;
public Object getBody() {
return this.body;
}
public void setBody(Object body) {
this.body = body;
}
public FullHttpRequest getRequest() {
return this.request;
}
public void setRequest(FullHttpRequest request) {
this.request = request;
}
public HttpJsonRequest(FullHttpRequest request, Object body) {
this.request = request;
this.body = body;
}
@Override
public String toString() {
return "HttpJsonRequest [request=" + request + ", body =" + body + "]";
}
}
package com.netty.http.json.bean.http;
import io.netty.handler.codec.http.FullHttpResponse;
/**
* Created by jack on 2018/5/4.
* 封装响应结果
*/
public class HttpJsonResponse {
private FullHttpResponse httpResponse;
private Object result;
public HttpJsonResponse(FullHttpResponse httpResponse, Object result) {
this.httpResponse = httpResponse;
this.result = result;
}
public FullHttpResponse getHttpResponse() {
return this.httpResponse;
}
public void setHttpResponse(FullHttpResponse httpResponse) {
this.httpResponse = httpResponse;
}
public Object getResult() {
return this.result;
}
public void setResult(Object result) {
this.result = result;
}
@Override
public String toString() {
return "HttpJsonResponse [httpResponse=" + httpResponse + ", result="
+ result + "]";
}
}
下面是一个工具类,主要提供两个方法,对象转换为Json和json转换为对象:在编解码器会用到
package com.netty.http.json.util;
import com.alibaba.fastjson.JSONObject;
/**
* Created by jack on 2018/5/4.
* 封装json工具类
*/
public class FastJsonUtil {
/**
* jsonstr to javabean
* @param jsonString
* @param clazz
* @param <T>
* @return
*/
public static <T> T parseObject(String jsonString,Class clazz){
return (T)JSONObject.parseObject(jsonString, clazz);
}
/**
* javabean to jsonstr
* @param t
* @param <T>
* @return
*/
public static <T> String parseJsonStr(T t){
return JSONObject.toJSONString(t);
}
}
下面先看两个抽象的类,分别是抽象解码器和抽象编码器:
解码器父类:
消息对象转换为消息对象
package com.netty.http.json.codec.decode;
import com.netty.http.json.bean.Order;
import com.netty.http.json.util.FastJsonUtil;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.nio.charset.Charset;
/**
* Created by jack on 2018/5/4.
* 解码
*/
public abstract class AbstractHttpJsonDecoder<T> extends MessageToMessageDecoder<T> {
private Class<?> clazz;
private boolean isPrint;
private final static Charset UTF_8 = Charset.forName("UTF-8");
protected AbstractHttpJsonDecoder(Class<?> clazz) {
this(clazz, false);
}
protected AbstractHttpJsonDecoder(Class<?> clazz, boolean isPrint) {
this.clazz = clazz;
this.isPrint = isPrint;
}
/**
* 字符串转换为java对象
* @param context
* @param body
* @return
*/
protected Object decode(ChannelHandlerContext context, ByteBuf body){
String content = body.toString(UTF_8);
if (isPrint)
System.out.println("this body is:"+content);
Object result = FastJsonUtil.parseObject(content, Order.class);
return result;
}
}
父类编码器:对象转换为bytebuf
package com.netty.http.json.codec.encode;
import com.netty.http.json.util.FastJsonUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.nio.charset.Charset;
/**
* Created by jack on 2018/5/4.
* 编码
*/
public abstract class AbstractHttpJsonEncoder<T> extends MessageToMessageEncoder<T> {
public final static Charset charset = Charset.forName("utf-8");
/**
* 对象转换为bytebuf
* @param context
* @param body
*/
protected ByteBuf encode (ChannelHandlerContext context,Object body){
String str = FastJsonUtil.parseJsonStr(body);
ByteBuf byteBuf = Unpooled.copiedBuffer(str,charset);
return byteBuf;
}
}
下面看一下request对应的解码器和编码器的具体实现:
解码器:
package com.netty.http.json.codec.decode;
import com.netty.http.json.bean.http.HttpJsonRequest;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.util.CharsetUtil;
import java.util.List;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* Created by jack on 2018/5/4.
* 服务端的解码
* 字符串转换为对象
*/
public class HttpJsonRequestDecoder extends AbstractHttpJsonDecoder<FullHttpRequest> {
/**
* 解码
* @param ctx
* @param msg
* @param out
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, FullHttpRequest msg, List<Object> out) throws Exception {
if (!msg.decoderResult().isSuccess()){
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
HttpJsonRequest request = new HttpJsonRequest(msg,decode(ctx,msg.content()));
out.add(request);
}
public HttpJsonRequestDecoder(Class<?> clazz) {
this(clazz, false);
}
/**
* 构造器
*
* @param clazz 解码的对象信息
* @param isPrint 是否需要打印
*/
public HttpJsonRequestDecoder(Class<?> clazz, boolean isPrint) {
super(clazz, isPrint);
}
/**
* 测试的话,直接封装,实战中需要更健壮的处理
*/
private static void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
status, Unpooled.copiedBuffer("Failure: " + status.toString()
+ "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
编码器:
package com.netty.http.json.codec.encode;
import com.netty.http.json.bean.http.HttpJsonRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.*;
import java.net.InetAddress;
import java.util.List;
/**
* Created by jack on 2018/5/4.
* 编码
* 客户端编码
* java对象转换为字节流
*/
public class HttpJsonRequestEncoder extends AbstractHttpJsonEncoder<HttpJsonRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, HttpJsonRequest msg, List<Object> out) throws Exception {
ByteBuf body = encode(ctx,msg.getBody());
//如果业务自定义http消息头,就直接使用,否则就创建一个
FullHttpRequest request = msg.getRequest();
if (request ==null){
request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/do", body);
//设置httpheaders 实际应该是通过读取配置文件来加载的
HttpHeaders headers = request.headers();
//设置host
headers.set(HttpHeaderNames.HOST, InetAddress.getLocalHost().getHostAddress());
//设置connection
headers.set(HttpHeaderNames.CONNECTION, HttpHeaders.Values.CLOSE);
//设置编码
headers.set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP.toString() + ',' + HttpHeaderValues.DEFLATE.toString());
//设置字符格式
headers.set(HttpHeaderNames.ACCEPT_CHARSET, "ISO-8859-1,utf-8;q=0.7,*;q=0.7");
//设置语言
headers.set(HttpHeaderNames.ACCEPT_LANGUAGE, "zh");
//设置客户端
headers.set(HttpHeaderNames.USER_AGENT, "Netty json Http Client side");
//设置接收数据的格式
headers.set(HttpHeaderNames.ACCEPT, "text/html,application/json;q=0.9,*/*;q=0.8");
}
HttpUtil.setContentLength(request,body.readableBytes());
out.add(request);
}
}
reponse端的解码器和编码器:
解码器:
package com.netty.http.json.codec.decode;
import com.netty.http.json.bean.http.HttpJsonResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpResponse;
import java.util.List;
/**
* Created by jack on 2018/5/4.
* 解码
* 客户端解码
* 字节流转换javabean
*/
public class HttpJsonResponseDecoder extends AbstractHttpJsonDecoder<FullHttpResponse> {
public HttpJsonResponseDecoder(Class<?> clazz) {
this(clazz, false);
}
/**
* 构造器
*
* @param clazz 解码的对象信息
* @param isPrint 是否需要打印
*/
public HttpJsonResponseDecoder(Class<?> clazz, boolean isPrint) {
super(clazz, isPrint);
}
@Override
protected void decode(ChannelHandlerContext ctx, FullHttpResponse msg, List<Object> out) throws Exception {
System.out.println("begin decode value....");
HttpJsonResponse response = new HttpJsonResponse(msg,decode(ctx,msg.content()));
out.add(response);
}
}
编码器:
package com.netty.http.json.codec.encode;
import com.netty.http.json.bean.http.HttpJsonResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import java.util.List;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* Created by jack on 2018/5/4.
* 编码
* 服务端编码
* java对象转换为字符串
*/
public class HttpJsonResponseEncoder extends AbstractHttpJsonEncoder<HttpJsonResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, HttpJsonResponse msg, List<Object> out) throws Exception {
//获取服务端的字节流
ByteBuf body = encode(ctx, msg.getResult());
//创建一个服务端响应对象
FullHttpResponse response = msg.getHttpResponse();
if (response == null) {
//实例化一个
response = new DefaultFullHttpResponse(HTTP_1_1, OK, body);
} else {
response = new DefaultFullHttpResponse(msg.getHttpResponse()
.protocolVersion(), msg.getHttpResponse().status(),
body);
}
//设置内容格式为json
response.headers().set(CONTENT_TYPE, "text/json");
//设置内容的长度
HttpUtil.setContentLength(response, body.readableBytes());
//添加到结果中
out.add(response);
}
}
在看客户端的业务处理handler:
package com.netty.http.json.handler;
import com.netty.http.json.bean.http.HttpJsonRequest;
import com.netty.http.json.factory.OrderFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Created by jack on 2018/5/4.
* 客户端处理业务的handler
*/
public class HttpJsonClientHandler extends ChannelInboundHandlerAdapter {
/**
* 客户端连接到服务端的时候 开始发送数据
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接上服务器:"+ctx.channel().remoteAddress());
HttpJsonRequest request = new HttpJsonRequest(null, OrderFactory.createOrder(1001));
ctx.writeAndFlush(request);
}
/**
* 接收到服务端返回回来的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg.getClass().getSimpleName());
System.out.println("客户端接收到服务端返回的数据:"+msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
服务端处理业务的Handler:
package com.netty.http.json.handler;
import com.netty.http.json.bean.Order;
import com.netty.http.json.bean.http.HttpJsonRequest;
import com.netty.http.json.bean.http.HttpJsonResponse;
import com.netty.http.json.factory.OrderFactory;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* Created by jack on 2018/5/4.
* 服务端处理业务的handler
*/
public class HttpJsonServerHandler extends SimpleChannelInboundHandler<HttpJsonRequest> {
/**
* 服务端读取客户端的数据并修改获取到的数据
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(final ChannelHandlerContext ctx, HttpJsonRequest msg) throws Exception {
//读取客户端的数据
HttpRequest request = msg.getRequest();
Order order = (Order)msg.getBody();
System.out.println("Http server receive request : " + order);
//修改客户端的数据
OrderFactory.setOrder(order);
//返回给客户端
ChannelFuture future = ctx.writeAndFlush(new HttpJsonResponse(null,order));
//后面需要仔细研读
if (!HttpUtil.isKeepAlive(request)) {
future.addListener(new GenericFutureListener<Future<? super Void>>() {
public void operationComplete(Future future) throws Exception {
ctx.close();
}
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
if (ctx.channel().isActive()) {
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
private static void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
status, Unpooled.copiedBuffer("失败: " + status.toString()
+ "\r\n", CharsetUtil.UTF_8));
response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}
辅助订单生产的工具类:
package com.netty.http.json.factory;
import com.netty.http.json.bean.Order;
/**
* Created by jack on 2018/5/4.
*/
public class OrderFactory {
/**
* 实例化一个order
* @param value
* @return
*/
public static Order createOrder(long value){
Order order = new Order();
order.setOrderNumber(value);
order.setBillTo("南京");
order.setShipping("国内快递");
order.setShipTo("上海");
order.setTotal(120);
order.setCustomer("贾红平");
return order;
}
/**
* 修改一个order
* @param order
*/
public static void setOrder(Order order){
order.setBillTo("北京");
order.setShipping("国内快递");
order.setShipTo("天津");
order.setTotal(120);
order.setCustomer("姚杰");
}
}
初始化客户端的channelhander:
package com.netty.http.json.Initializer;
import com.netty.http.json.bean.Order;
import com.netty.http.json.codec.decode.HttpJsonRequestDecoder;
import com.netty.http.json.codec.decode.HttpJsonResponseDecoder;
import com.netty.http.json.codec.encode.HttpJsonRequestEncoder;
import com.netty.http.json.handler.HttpJsonClientHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
/**
* Created by jack on 2018/5/4.
* 客户端初始化各种编解码器和具体的handler到channelpiple
*/
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//设置当前的解码器是基于http
ch.pipeline().addLast(new HttpResponseDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
//设置自定义解码器
ch.pipeline().addLast(new HttpJsonResponseDecoder(Order.class,true));
//设置当前的编码器是基于http
ch.pipeline().addLast(new HttpRequestEncoder());
//设置自定义编码器
ch.pipeline().addLast(new HttpJsonRequestEncoder());
//设置业务处理的Handler
ch.pipeline().addLast(new HttpJsonClientHandler());
}
}
服务端处初始化channelhander:
package com.netty.http.json.Initializer;
import com.netty.http.json.bean.Order;
import com.netty.http.json.codec.decode.HttpJsonRequestDecoder;
import com.netty.http.json.codec.encode.HttpJsonResponseEncoder;
import com.netty.http.json.handler.HttpJsonServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
/**
* Created by jack on 2018/5/4.
* 服务端初始化各种编解码器和具体的handler到channelpiple
*/
public class ServerInitializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpRequestDecoder());
ch.pipeline().addLast(new HttpObjectAggregator(65536));
ch.pipeline().addLast(new HttpJsonRequestDecoder(Order.class,true));
ch.pipeline().addLast(new HttpResponseEncoder());
ch.pipeline().addLast(new HttpJsonResponseEncoder());
ch.pipeline().addLast(new HttpJsonServerHandler());
}
}
定义client:
package com.netty.http.json.client;
import com.netty.http.json.Initializer.ClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
/**
* Created by jack on 2018/5/4.
* 客户端工作
*/
public class HttpJsonClient {
static EventLoopGroup group;
static Bootstrap client;
static {
group = new NioEventLoopGroup();
client = new Bootstrap();
client.group(group).channel(NioSocketChannel.class);
client.option(ChannelOption.SO_SNDBUF,128);
client.option(ChannelOption.TCP_NODELAY,false);
client.handler(new ClientInitializer());
}
/**
* 启动客户端
* @param port
*/
public static void connect(int port){
try {
ChannelFuture future = client.connect(new InetSocketAddress(port)).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
connect(8080);
}
}
定义服务端:
package com.netty.http.json.server;
import com.netty.http.json.Initializer.ServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* Created by jack on 2018/5/4.
* 创建服务端
*/
@SuppressWarnings("all")
public class HttpJsonServer {
static EventLoopGroup bossLoopGroup;//主线程
static EventLoopGroup workLoopGroup;//从线程
static ServerBootstrap serverBootstrap;//服务
static {
bossLoopGroup = new NioEventLoopGroup();
workLoopGroup = new NioEventLoopGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossLoopGroup,workLoopGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG,128);
serverBootstrap.option(ChannelOption.SO_SNDBUF,128);
serverBootstrap.option(ChannelOption.SO_KEEPALIVE,true);
serverBootstrap.childHandler(new ServerInitializer());
}
/**
* 启动服务器
* @param port
*/
public static void run(int port){
try {
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(port)).sync();
System.out.println("HTTP订购服务器启动,网址是 : " + "http://localhost:" + port);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossLoopGroup.shutdownGracefully();
workLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
run(8080);
}
}
ok 到此为止代码功能就全部完成了,让我们先看一下效果再做总结:
服务端接受到客户端的数据是基于对象的:
客户端接受到服务端的数据是以json格式的:
这样就实现了基于Json+自定义编解码的功能的一个协议.