- Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina 同时提供了网络通信的Server 端、Client 端的封装,无论是哪端,Mina 在整个网通通信结构中都处于如下的位置:可见Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。同样的,无论是哪端,Mina 的执行流程如下所示:
(1.) IoService:最底层的是IOService,负责具体的IO相关工作。这一层的典型代表有IOSocketAcceptor和IOSocketChannel,分别对应TCP协议下的服务端和客户端的IOService。IOService的意义在于隐藏底层IO的细节,对上提供统一的基于事件的异步IO接口。每当有数据到达时,IOService会先调用底层IO接口读取数据,封装成IoBuffer,之后以事件的形式通知上层代码,从而将Java NIO的同步IO接口转化成了异步IO。所以从图上看,进来的low-level IO经过IOService层后变成IO Event。具体的代码可以参考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有内部类Processor。
(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler可以看成是Mina处理流程的终点,每个IoService都需要指定一个IoHandler。
(5.)IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个IoSession对应于一个底层的IO连接(在Mina中UDP也被抽象成了连接)。通过IoSession,可以获取当前连接相关的上下文信息,以及向远程peer发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过IoFilterChain,到达IoService。但IoService上并不会直接调用底层IO接口来将数据发送出去,而是会将该次调用封装成一个WriteRequest,放入session的writeRequestQueue中,最后由IoProcessor线程统一调度flush出去。所以发送操作并不会引起上层调用线程的阻塞。具体代码可以参考org.apache.mina.core.filterchain.DefaultIoFilterChain的内部类HeadFilter的filterWrite方法。
MINA的核心类图:
IOService:
既可以做服务端,也可以做客户端。
(1)TCP服务端编写:
//TCP连接
public class MinaServer {
private static final int port = 8080;
public static void main(String[] args) throws IOException{
//为服务端创建IoAcceptor,NioSocketAcceptor是基于NIO的服务器监听器
IoAcceptor acceptor = new NioSocketAcceptor(); //NioSocketAcceptor非阻塞的套接字TCP IoAcceptor
//设置记录过滤器
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
//设置解编码过滤器(TextLineCodecFactory是mina自带的文本解编码器)
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));
//业务逻辑处理
acceptor.setHandler(new serverHandler());
//设置Buffer的缓冲区大小
acceptor.getSessionConfig().setReadBufferSize(2048);
//设置等待时间,每隔IdleTime将调用一次handler.sessionIdle()方法
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 20);
//绑定端口
acceptor.bind(new InetSocketAddress(port));
}
static class serverHandler extends IoHandlerAdapter{
public void sessionCreated(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
public void sessionOpened(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
}
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// TODO Auto-generated method stub
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
// TODO Auto-generated method stub
}
public void messageReceived(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
String str = message.toString();
if(str.trim().equalsIgnoreCase("quit")){
session.close(false);
return;
}
}
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
}
}
}
(2)TCP客户端编写:
//TCP连接
public class MinaClient {
private static final String serverIP = "127.0.0.1";
private static final int serverPORT = 9090;
public static void main(String[] args){
//创建Connector连接器
NioSocketConnector connector = new NioSocketConnector();//NioSocketConnector非阻塞的套接字TCP IoConnector
//设置连接超时时间
connector.setConnectTimeoutMillis(5000);
//设置记录过滤器
connector.getFilterChain().addLast("logger", new LoggingFilter());
//设置解编码过滤器(TextLineCodecFactory是mina自带的文本解编码器)
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));
//设置连接休眠时间
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 20);
//业务逻辑处理
connector.setHandler(new ClientHandler(1));
//设置服务器的ip和端口
connector.setDefaultRemoteAddress(new InetSocketAddress(serverIP,serverPORT));
IoSession session = null;
try{
//连接远程主机
ConnectFuture future = connector.connect();
//connect()方法是异步的,为了准确获得session,不可中断式的等待
future.awaitUninterruptibly();
//连接建立后,获得会话session
session = future.getSession();
}catch(Exception e){
e.printStackTrace();
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}finally{
if(session!= null)
//等待本次连接通话结束,不可中断式的等待
session.getCloseFuture().awaitUninterruptibly();
}
//关闭连接
connector.dispose();
// return session;
}
}
ClientHandler方法编写:
//TCP连接
public class ClientHandler implements IoHandler {
private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
private int value;
public ClientHandler(int value) {
// super();
// TODO Auto-generated constructor stub
this.value = value;
}
public void sessionCreated(IoSession session) throws Exception {
// TODO Auto-generated method stub
logger.info("客户端会话创建:"+session.getRemoteAddress());
}
public void sessionOpened(IoSession session) throws Exception {
// TODO Auto-generated method stub
logger.info("客户端会话打开"+session.getRemoteAddress()+":将信息写入session");
session.write(value);
}
public void sessionClosed(IoSession session) throws Exception {
// TODO Auto-generated method stub
logger.info("客户端会话关闭:"+session.getRemoteAddress());
}
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// TODO Auto-generated method stub
logger.info("客户端会话休眠:"+session.getRemoteAddress());
logger.info("IDLE:"+session.getIdleCount(status));
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
// TODO Auto-generated method stub
logger.info("客户端异常:"+session.getRemoteAddress()+":"+cause.getMessage());
session.close(true);
}
public void messageReceived(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
IoBuffer buffer = (IoBuffer)message;
byte[] bytes = new byte[buffer.limit()];
String str = "";
str = new String(bytes,"utf-8");
logger.info("客户端收到 "+session.getRemoteAddress()+"服务器发来的消息"+str);
session.close(true);
}
public void messageSent(IoSession session, Object message) throws Exception {
// TODO Auto-generated method stub
IoBuffer buffer = (IoBuffer) message;
byte[] bytes = new byte[buffer.limit()];
String str = new String(bytes, "utf-8");
logger.info("信息已经发送给服务端"+session.getRemoteAddress()+str);
}
}
(3)UDP服务端编写:
public class UdpMinaServer {
public static void main(String[] args) throws IOException{
//创建数据报的非阻塞式Acceptor,端口初始化null
NioDatagramAcceptor acceptor = new NioDatagramAcceptor(null);
//不用设置字符解编码过滤器,数据可以直接用Buffer字节流传递
//记录过滤器
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
acceptor.setHandler(new UdpServerHandler());
//等待时间,调用的是IoHandler中的sessionIdle方法
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
//NioDatagramAcceptor须初始化为null,才能重复使用端口,此方法须在绑定端口之前调用
acceptor.getSessionConfig().setReuseAddress(true);
//绑定端口
acceptor.bind(new InetSocketAddress(8080));
}
static class UdpServerHandler extends IoHandlerAdapter{
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
IoBuffer buffer = (IoBuffer)message;
System.out.println(buffer.getLong());
session.close(false);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
System.out.println("IDLE " + session.getIdleCount(status));
}
}
}
(4)UDP客户端编写:
public class UdpMinaClient {
//定义全局session
private static IoSession session = null;
private static final String serverIP = "127.0.0.1";
private static final int serverPORT = 9090;
public static void main(String[] args){
//创建数据报的非阻塞式Connector连接器
NioDatagramConnector connector = new NioDatagramConnector();
//不用设置字符解编码过滤器,数据可以直接用Buffer字节流传递
//记录过滤器
connector.getFilterChain().addLast("logger", new LoggingFilter());
//IoHandler
connector.setHandler(new UdpClientHandler());
try{
//创建连接,serverIP,serverPORT为远程主机的
ConnectFuture future = connector.connect(new InetSocketAddress(serverIP,serverPORT));
//不可中断式的等待连接建立完成
future.awaitUninterruptibly();
//获取会话session
session = future.getSession();
//增加连接建立完成后的监听器
//连接建立后设置监听器,监听器会立刻执行
future.addListener(new IoFutureListener<ConnectFuture>(){
public void operationComplete(ConnectFuture future) {
// TODO Auto-generated method stub
if(future.isConnected()){
sendData();
}
else{
System.out.println("Not connected...exiting");
}
}
});
}catch (Exception e){
e.printStackTrace();
}finally{
if(session != null)
session.getCloseFuture().awaitUninterruptibly();
}
//关闭连接
connector.dispose();
}
/**
*
*/
protected static void sendData() {
// TODO Auto-generated method stub
Long free = Runtime.getRuntime().freeMemory();
IoBuffer buffer = IoBuffer.allocate(8);//初始化容量八个字节
//如果数据的长度大于八个字节的话,IoBuffer会根据情况重新分配其内置的ByteBuffer,它的容量会加倍,它的limit会增长到String被写入时的最后position。
buffer.setAutoExpand(true);
buffer.putLong(free);
buffer.flip(); //一般是结束buff操作,将buff写入输出流时调用
session.write(buffer);
session.write("将数据写入session");
//因为是UDP,客户端需要主动关闭连接
session.close(false);
}
static class UdpClientHandler extends IoHandlerAdapter{
}
}