• Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina 同时提供了网络通信的Server 端、Client 端的封装,无论是哪端,Mina 在整个网通通信结构中都处于如下的位置:可见Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。同样的,无论是哪端,Mina 的执行流程如下所示:

mina笔记

(1.) IoService:最底层的是IOService,负责具体的IO相关工作。这一层的典型代表有IOSocketAcceptor和IOSocketChannel,分别对应TCP协议下的服务端和客户端的IOService。IOService的意义在于隐藏底层IO的细节,对上提供统一的基于事件的异步IO接口。每当有数据到达时,IOService会先调用底层IO接口读取数据,封装成IoBuffer,之后以事件的形式通知上层代码,从而将Java NIO的同步IO接口转化成了异步IO。所以从图上看,进来的low-level IO经过IOService层后变成IO Event。具体的代码可以参考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有内部类Processor。

(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler可以看成是Mina处理流程的终点,每个IoService都需要指定一个IoHandler。

(5.)IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个IoSession对应于一个底层的IO连接(在Mina中UDP也被抽象成了连接)。通过IoSession,可以获取当前连接相关的上下文信息,以及向远程peer发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过IoFilterChain,到达IoService。但IoService上并不会直接调用底层IO接口来将数据发送出去,而是会将该次调用封装成一个WriteRequest,放入session的writeRequestQueue中,最后由IoProcessor线程统一调度flush出去。所以发送操作并不会引起上层调用线程的阻塞。具体代码可以参考org.apache.mina.core.filterchain.DefaultIoFilterChain的内部类HeadFilter的filterWrite方法。

 

MINA的核心类图:

mina笔记

IOService:

既可以做服务端,也可以做客户端。

(1)TCP服务端编写:

//TCP连接
public class MinaServer {

	private static final int port = 8080;
	
	public static void main(String[] args) throws IOException{
		//为服务端创建IoAcceptor,NioSocketAcceptor是基于NIO的服务器监听器
		IoAcceptor acceptor = new NioSocketAcceptor();	//NioSocketAcceptor非阻塞的套接字TCP IoAcceptor
		
		//设置记录过滤器
		acceptor.getFilterChain().addLast("logger", new LoggingFilter());
		
		//设置解编码过滤器(TextLineCodecFactory是mina自带的文本解编码器)
		acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));
		
		//业务逻辑处理
		acceptor.setHandler(new serverHandler());
		
		//设置Buffer的缓冲区大小
		acceptor.getSessionConfig().setReadBufferSize(2048);
		
		//设置等待时间,每隔IdleTime将调用一次handler.sessionIdle()方法
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 20);
		
		//绑定端口
		acceptor.bind(new InetSocketAddress(port));
	}
	
	static class serverHandler extends IoHandlerAdapter{
		public void sessionCreated(IoSession session) throws Exception {
			// TODO Auto-generated method stub

		}

		public void sessionOpened(IoSession session) throws Exception {
			// TODO Auto-generated method stub

		}

		public void sessionClosed(IoSession session) throws Exception {
			// TODO Auto-generated method stub

		}

		public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
			// TODO Auto-generated method stub

		}

		public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
			// TODO Auto-generated method stub

		}

		public void messageReceived(IoSession session, Object message) throws Exception {
			// TODO Auto-generated method stub
			String str = message.toString();
			if(str.trim().equalsIgnoreCase("quit")){
				session.close(false);
				return;
			}
		}

		public void messageSent(IoSession session, Object message) throws Exception {
			// TODO Auto-generated method stub

		}
	}
}

(2)TCP客户端编写:

//TCP连接
public class MinaClient {

	private static final String serverIP = "127.0.0.1";
	private static final int serverPORT = 9090;
	
	public static void main(String[] args){
		//创建Connector连接器
		NioSocketConnector connector = new NioSocketConnector();//NioSocketConnector非阻塞的套接字TCP IoConnector
		
		//设置连接超时时间
	    connector.setConnectTimeoutMillis(5000);
		
		//设置记录过滤器
		connector.getFilterChain().addLast("logger", new LoggingFilter());
		
		//设置解编码过滤器(TextLineCodecFactory是mina自带的文本解编码器)
		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("utf-8"))));
		
		//设置连接休眠时间
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 20); 
		
		//业务逻辑处理
		connector.setHandler(new ClientHandler(1));
		
		//设置服务器的ip和端口
		connector.setDefaultRemoteAddress(new InetSocketAddress(serverIP,serverPORT));
		
		IoSession session = null;
		try{
			//连接远程主机
			ConnectFuture future = connector.connect();
			//connect()方法是异步的,为了准确获得session,不可中断式的等待
			future.awaitUninterruptibly();
			//连接建立后,获得会话session
			session = future.getSession();		
		}catch(Exception e){
			e.printStackTrace();
			try {
				Thread.sleep(5000);
			} catch (InterruptedException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
		}finally{
			if(session!= null)
				//等待本次连接通话结束,不可中断式的等待
				session.getCloseFuture().awaitUninterruptibly();
		}
		//关闭连接
		connector.dispose();
//		return session;
		
	}
}

ClientHandler方法编写:

//TCP连接
public class ClientHandler implements IoHandler {
	private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class);
	
	private int value;

	public ClientHandler(int value) {
//		super();
		// TODO Auto-generated constructor stub
		this.value = value;
	}

	public void sessionCreated(IoSession session) throws Exception {
		// TODO Auto-generated method stub
		logger.info("客户端会话创建:"+session.getRemoteAddress());

	}

	public void sessionOpened(IoSession session) throws Exception {
		// TODO Auto-generated method stub
		logger.info("客户端会话打开"+session.getRemoteAddress()+":将信息写入session");
		session.write(value);

	}

	public void sessionClosed(IoSession session) throws Exception {
		// TODO Auto-generated method stub
		logger.info("客户端会话关闭:"+session.getRemoteAddress());
	}

	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		// TODO Auto-generated method stub
		logger.info("客户端会话休眠:"+session.getRemoteAddress());
		logger.info("IDLE:"+session.getIdleCount(status));

	}

	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		// TODO Auto-generated method stub
		logger.info("客户端异常:"+session.getRemoteAddress()+":"+cause.getMessage());
		session.close(true);
	}

	public void messageReceived(IoSession session, Object message) throws Exception {
		// TODO Auto-generated method stub
		IoBuffer buffer = (IoBuffer)message;
		byte[] bytes = new byte[buffer.limit()];
		String str = "";
		str = new String(bytes,"utf-8");
		logger.info("客户端收到 "+session.getRemoteAddress()+"服务器发来的消息"+str);
		session.close(true);
	}

	public void messageSent(IoSession session, Object message) throws Exception {
		// TODO Auto-generated method stub
		IoBuffer buffer = (IoBuffer) message;
		byte[] bytes = new byte[buffer.limit()];
		String str = new String(bytes, "utf-8");
		logger.info("信息已经发送给服务端"+session.getRemoteAddress()+str);
	}

}

(3)UDP服务端编写:

public class UdpMinaServer {

	public static void main(String[] args) throws IOException{
		//创建数据报的非阻塞式Acceptor,端口初始化null
		NioDatagramAcceptor acceptor = new NioDatagramAcceptor(null);
		//不用设置字符解编码过滤器,数据可以直接用Buffer字节流传递
		//记录过滤器
		acceptor.getFilterChain().addLast("logger", new LoggingFilter());
		
		acceptor.setHandler(new UdpServerHandler());
		//等待时间,调用的是IoHandler中的sessionIdle方法
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
		
		//NioDatagramAcceptor须初始化为null,才能重复使用端口,此方法须在绑定端口之前调用
		acceptor.getSessionConfig().setReuseAddress(true);
		//绑定端口
		acceptor.bind(new InetSocketAddress(8080));
	}
	
	static class UdpServerHandler extends IoHandlerAdapter{
		@Override
		public void messageReceived(IoSession session, Object message) throws Exception {
			IoBuffer buffer = (IoBuffer)message;
			System.out.println(buffer.getLong());
			session.close(false);
		}
		@Override
		public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
			System.out.println("IDLE " + session.getIdleCount(status));
		}
	}
}

(4)UDP客户端编写:

public class UdpMinaClient {

	//定义全局session
	private static IoSession session = null;
	private static final String serverIP = "127.0.0.1";
	private static final int serverPORT = 9090;
	
	public static void main(String[] args){
		//创建数据报的非阻塞式Connector连接器
		NioDatagramConnector connector = new NioDatagramConnector();
		//不用设置字符解编码过滤器,数据可以直接用Buffer字节流传递
		//记录过滤器
		connector.getFilterChain().addLast("logger", new LoggingFilter());
		//IoHandler
		connector.setHandler(new UdpClientHandler());
		
		try{
			//创建连接,serverIP,serverPORT为远程主机的
			ConnectFuture future = connector.connect(new InetSocketAddress(serverIP,serverPORT));
			//不可中断式的等待连接建立完成
			future.awaitUninterruptibly();
			//获取会话session
			session = future.getSession();
			//增加连接建立完成后的监听器
			//连接建立后设置监听器,监听器会立刻执行
			future.addListener(new IoFutureListener<ConnectFuture>(){

				public void operationComplete(ConnectFuture future) {
					// TODO Auto-generated method stub
					if(future.isConnected()){
						sendData();					
					}
					else{
						System.out.println("Not connected...exiting");
					}						
				}				
			});
			
		}catch (Exception e){
			e.printStackTrace();
		}finally{
			if(session != null)
				session.getCloseFuture().awaitUninterruptibly();
		}
		//关闭连接
		connector.dispose();	
	}
	
	/**
	 * 
	 */
	protected static void sendData() {
		// TODO Auto-generated method stub
		Long free = Runtime.getRuntime().freeMemory();
		IoBuffer buffer = IoBuffer.allocate(8);//初始化容量八个字节
		//如果数据的长度大于八个字节的话,IoBuffer会根据情况重新分配其内置的ByteBuffer,它的容量会加倍,它的limit会增长到String被写入时的最后position。
		buffer.setAutoExpand(true);
		buffer.putLong(free);
		buffer.flip();	//一般是结束buff操作,将buff写入输出流时调用
		session.write(buffer);
		session.write("将数据写入session");
		//因为是UDP,客户端需要主动关闭连接
		session.close(false);
	}

	static class UdpClientHandler extends IoHandlerAdapter{
		
	}
}

 

相关文章:

  • 2021-10-02
  • 2022-12-23
  • 2021-05-03
  • 2021-07-20
  • 2021-06-14
  • 2022-01-17
  • 2021-11-08
  • 2022-02-16
猜你喜欢
  • 2021-05-22
  • 2021-07-19
  • 2021-12-18
  • 2021-05-04
  • 2021-09-11
  • 2021-08-13
  • 2022-12-23
相关资源
相似解决方案