正文

项目地址:gitee.com/baojh123/rp…

Netty实现简易版的RPC框架过程详解

netty-study 这个项目是没用到的,可以删掉,主要是测试Netty自定义协议的

1:如何运行项目

1:本地起一个zookeeper服务

2: 只需要运行 rpc-serverspringboot-zk-study二个项目即可

3: 二个项目的application.yml 都不需要改,唯一要改的就是zookeepr的连接配置信息

Netty实现简易版的RPC框架过程详解

4:启动好之后,在浏览器访问

http://localhost:8081/zk/test

http://localhost:8081/zk/people

http://localhost:8081/zk/list

可以查看到返回结果

Netty实现简易版的RPC框架过程详解

2:从客户端调用开始(springboot-zk-study项目)

@RestController
@RequestMapping("/zk")
public class ZkController {
            @Resource
            @MyResource
            private UserService userService;
            @Resource
            @MyResource
            private PeopleService peopleService;
            @GetMapping("/test")
            public String test() {
                return userService.test("bjh-",1);
            }
            @GetMapping("/people")
            public Object people() {
                return peopleService.query(1L);
            }
            @GetMapping("/list")
            public Object list() {
                return peopleService.list();
            }
}

只需要在我们需要进行RPC调用的接口上添加 @MyResource 注解即可,当我们执行这个方法之后,就会执行代理方法,代理方法在 rpc-core 项目中,为了阅读清晰,我只贴出重点的方法

@Slf4j
public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner {
     ......省略一些代码
     // 客户端执行方法之后,就会执行到这里的代理方法
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
         //从注册中心拿到服务列表
         ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class);
         List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList();
         for(ZkProperties zkProperties : zkPropertiesList) {
             String interfaceName = zkProperties.getInterfaceName();
             Class<?> declaringClass = method.getDeclaringClass();
             if(StringUtils.equals(declaringClass.getName(),interfaceName)) {
                 List<InterfaceInfo> info = zkProperties.getInfo();
                 InterfaceInfo interfaceInfo = info.get(0);
                 String ipAddress = interfaceInfo.getIpAddress();
                 List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo();
                 InterfaceImplInfo implInfo = interfaceImplInfo.get(0);
                 String[] strings = ipAddress.split(":");
                 //与远程Netty服务端发起连接
                 RpcClient rpcClient = connNettyServer(strings[0], zkPropertiesSource.getNettyConnectPort());
                 /**
                  * 封装请求参数
                  */
                 //获取方法参数类型
                 Class<?>[] parameterTypes = method.getParameterTypes();
                 List<String> types = getTypes(parameterTypes);
                 //同步调用
                 result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName);
                 log.info("返回结果是:{}",result);
             }
         }
         Class<?> returnType = method.getReturnType();
         Object value = objectMapper.readValue(result.toString(), returnType);
         return value;
     }
     private RpcClient connNettyServer(String ipAddress,Integer port) {
         return new RpcClient(ipAddress,port);
     }
     private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{
         RpcMessage rpcMessage = new RpcMessage();
          ......
         //发送请求
         Response result = rpcClient.sendRequest(rpcMessage);
         log.info("请求结果是:{}", JSONUtil.toJsonPrettyStr(result));
         return result.getData();
     }
     ......省略一些代码

我们初始化客户端连接和发送请求都在一个RpcClient的类中,我们看下这个类的代码

@Slf4j
public class RpcClient {
 EventLoopGroup group = new NioEventLoopGroup();
 Bootstrap bootstrap;
 private String ip;
 private Integer port;
 RpcClientHandler rpcClientHandler;
 private ChannelFuture channelFuture;
 public RpcClient(String ip,Integer port) {
     bootstrap = new Bootstrap();
     bootstrap.group(group)
             .channel(NioSocketChannel.class) // 使用NioSocketChannel作为客户端的通道实现
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 protected void initChannel(SocketChannel ch) throws Exception {
                     //加入处理器
                     rpcClientHandler = new RpcClientHandler();
                     ch.pipeline().addLast(new RpcDecoder());
                     ch.pipeline().addLast(new RpcEncoder());
                     ch.pipeline().addLast(rpcClientHandler);
                 }
             });
     try {
         // 和远程Nett服务端建立连接
         channelFuture = bootstrap.connect(ip, port).sync();
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
 }
 public Response sendRequest(RpcMessage rpcMessage) throws Exception{
     //发送请求
     channelFuture.channel().writeAndFlush(rpcMessage).sync();
     channelFuture.channel().closeFuture().sync();
     log.info("获取返回结果=====================");
     Response response = rpcClientHandler.getResponse();
     return response;
 }
}

客户端在这发送请求到服务端之后,就接收服务端返回回来的消息即可,然后将返回结果返回给我们的接口。客户端的调用就到这里了,现在看下服务端的

3:服务端处理请求

服务端处理请求的核心都在 rpc-coreRpcServerHandler

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
    ObjectMapper objectMapper = new ObjectMapper();
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
        Object obj = rpcMessage.getObj();
        RpcMessage rpcMessageResponse = new RpcMessage();
        Response response = new Response();
        try{
            Request request = objectMapper.readValue(obj.toString(), Request.class);
            String interfaceImplName = request.getInterfaceImplName();
            Class<?> aClass = Class.forName(interfaceImplName);
            List<String> paramsTypes = request.getParamsTypes();
            try {
                Object result = null;
                //判读方法是有参数的还是没有参数的
                if(paramsTypes.isEmpty()) {
                    Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName());
                    result = declaredMethod.invoke(aClass.newInstance());
                }else {
                    Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeString2Class(paramsTypes, request.getParams().toArray());
                    Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes");
                    Object[] args = (Object[]) paramsObjectMap.get("args");
                    result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args);
                }
                log.info("返回结果是:{}",result);
                response.setData(objectMapper.writeValueAsString(result));
                response.setIsOk(1);
                response.setErrInfo("error");
                rpcMessageResponse.setObj(response);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
                response.setData("error");
                response.setIsOk(0);
                response.setErrInfo(throwable.getMessage());
                rpcMessageResponse.setObj(response);
            }
        }catch (Exception e) {
            response.setData("error");
            response.setIsOk(0);
            response.setErrInfo(e.getMessage());
            rpcMessageResponse.setObj(response);
        }
        String valueAsString = objectMapper.writeValueAsString(response);
        rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-8")).length);
        rpcMessageResponse.setObj(valueAsString);
        channelHandlerContext.writeAndFlush(rpcMessageResponse);
    }
}

服务端就拿到客户端传过来的接口名称,从zookeeper获取到具体的实现类,然后通过反射调用即可

4:接下来要做什么

上面只是简单的介绍了下整个调用的大概过程,还有很多问题没有解释清楚,比如

1:在客户端我们要使用UserService,但是你会发现我们使用了二个注解,一个是我们自定义的,一个是spring注入用的,但是在项目中我们并没有这个接口的实现类,spring是怎么将这个接口注入到自己容器中的呢

2: 为什么调用使用了 @MyResource的接口方法都会走代理方法,是怎么做到的

@Resource
@MyResource
private PeopleService peopleService;

3:我们的服务是怎么在服务启动的时候注册到zookeeper的,注册的信息又是什么,可以看下我们服务注册到zookeeper的信息如下

{
	"zkPropertiesList": [{
		"interfaceName": "com.bjh.service.PeopleService",
		"info": [{
			"ipAddress": "192.168.83.1:9091",
			"interfaceImplInfo": [{
				"name": "com.bjh.service.PeopleServiceImpl",
				"value": "com.bjh.service.PeopleServiceImpl"
			}]
		}]
	}, {
		"interfaceName": "com.bjh.service.UserService",
		"info": [{
			"ipAddress": "192.168.83.1:9091",
			"interfaceImplInfo": [{
				"name": "com.bjh.service.UserServiceImpl",
				"value": "com.bjh.service.UserServiceImpl"
			}]
		}]
	}]
}

4:在我们的服务端的实现类,我们只使用了我们自定义的 @Service注解,这个注解不是Spring的

   @Service
   public class PeopleServiceImpl implements PeopleService{
        @Override
        public People query(long id) {
            People people = new People();
            people.setId(id);
            people.setName("coco");
            return people;
        }
        @Override
        public List&lt;People&gt; list() {
            List&lt;People&gt; list = new ArrayList&lt;&gt;();
            People people = new People();
            people.setId(123L);
            people.setName("coco");
            People people2 = new People();
            people2.setId(124L);
            people2.setName("baojh");
            list.add(people);
            list.add(people2);
            return list;
        }
}

5:还有客户端请求的结构体是怎么样的,还有返回响应结果是怎么样的等等,后续我会继续更新

更多关于Netty简易版RPC框架的资料请关注其它相关文章!

原文地址:https://juejin.cn/post/7198041700563877945

相关文章: