Leo_wl

微服务实践之路--RPC

重点来了,本文全面阐述一下我们的RPC是怎么实现并如何使用的,跟Kubernetes和Openstack怎么结合。 
在选型一文中说到我们选定的RPC框架是Apache Thrift,它的用法是在Main方法中重启服务,在Client端连接服务去调用,

而我的想法是要跟Dubblo、HSF的用法一样,因为很多人都熟习这两个框架的用法,特别是我们好几个项目都是基于EDAS开发的,而且世面上用Dubbo的公司也很多。

顺便再说一下我们对于RPC的几点要求:

  • 1,兼容Dubbo和HSF的使用方法,支持版本和服务分组,支持项目隔离
  • 2,客户端重试机制,可以配置次数和间隔时间
  • 3,客户端线程池
  • 4,服务端可以平滑无缝升级而不影响客户端的使用

兼容Dubbo就必然要使用Spring框架,那我们就直接上Spring Boot好了,号称Spring Boot为微服务开发而生,一步到位,将Thrift跟Spring Boot整合。

版本和服务分组可以通过Kubernetes的Service的Label来实现,我们客户端在查找服务的时候通过这两个标签再加上接口类的Label来定位Service的Cluster IP,这里不直接使用Service名称来调用服务的原因是通过Label查询Servcie更加灵活一些,Service的名称不受限制,随时可以启动一个带有相同Label的新Service来替换旧的Service.
项目隔离可以用Kubernetes的namespace来实现,一个namespace是一个项目,当然项目之间也可以互相调用,默认情况下是整个Kubernetes集群的服务都是可以被调用到的如果在没有指定namespace的情况下。

客户端重试机制用代理Thrift连接的方式来实现,在连接或接口方法调用异常时发起重新连接,参考:https://liveramp.com/engineering/reconnecting-thrift-client/

客户端连接池是由于在WEB项目中每次用户发起请求是在一个独立的线程中,而Thrift的Client Socket连接不是线程安全的,因此要为每个用户准备一个Socket连接,有点像数据库的连接池,这个可以用apache的commons pool2来实现,这个有很多网友的文章可以参考,本文就不在赘述了。

服务端平滑升级可以使用Kubernetes的Kubectl rolling-update来实现,它的机制是先创建一个RC,然后新建一个新版本Pod,停掉一个旧版本Pod,逐步来完成整个RC的更新,最后删除旧的RC,把新的RC名称改为旧的RC名称,升级过程如下图:

 
这里会有一个问题,因为有一个时间段会新旧RC共存,由于Service是基于RC的Label建立的,那过来的请求是不是会得到两种结果?


 如果是的话要防止这样的情况发生就要像上面说的,将整个Service替换,先启动一个新的Service跟旧的Service有相同Label,然后删除旧的Service以及RC,在发生服务请求的时候Thrift Client在找不到旧的服务的时候根据Label重新查找Service就会切换到新的Service上。

 
下面展示一下大概的实现及使用方法,假设你熟习Kubernetes或者简单了解,熟习Docker。

服务端

配置


    <bean class="io.masir.testcloud.thrift.HelloImpl" id="helloImpl"/>
    <bean class="io.masir.testcloud.thrift.ThriftSpringProviderBean" init-method="init" id="providerBean">
        <property name="serviceInterface" value="io.masir.testcloud.thrift.HelloService"/>
        <property name="serviceVersion" value="1.0.0"/>
        <property name="serviceGroup" value="testServiceGroup"/>
        <property name="target" ref="helloImpl"/>
    </bean>

ThriftSpringProviderBean核心代码 这是Thrift和Spring整合的核心代码,可以借鉴其它网友的Thrift Spring实例。


public class ThriftSpringProviderBean  extends Thread {
 


    private int port = 10809;
    private String serviceInterface;
    private String serviceVersion;
    private String serviceGroup;
    private Object target;
 
    public void run() {
        try {
            TServerSocket serverTransport = new TServerSocket(getPort());
            Class Processor = Class.forName(getServiceInterface() + "$Processor");

            Class Iface = Class.forName(getServiceInterface() + "$Iface");

            Constructor con = Processor.getConstructor(Iface);

            TProcessor processor = (TProcessor) con.newInstance(getTarget());

            TBinaryProtocol.Factory protFactory = new TBinaryProtocol.Factory(true, true);
            TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
            args.protocolFactory(protFactory);

            args.processor(processor);
            TServer server = 

分类:

技术点:

相关文章: