简单研究下RestTemplate、 ribbon、 OpenFeign 关系 三者之间的关系。
1. RestTemplate
RestTemplate 使用的是: spring-web 包下面的http 模块的http包中的API。 也就是Spring 自己封装的一套的httpclient API, 下面还是走java 的HttpurlConnection 建立连接然后传输数据。从名字也可以看出其是作为一个模板类可以在项目中使用。类似的还有RedisTemplate 等工具类。
可以单独使用,单独使用的时候相当于直接连接到url, 不走微服务里面的服务发现以及负载均衡等机制。
1. 测试直接url
1. 注入bean
@Bean public RestTemplate getRestTemplate() { return new RestTemplate(); }
2. 添加测试url
@GetMapping("/test")
public String test() {
return restTemplate.getForObject("http://news.baidu.com/guonei", String.class);
}
3. 查看调用链: 断点打在java.net.Socket#connect(java.net.SocketAddress, int) 。 通过HttpURLConnection 建立连接会调用到这里进行连接, 调用链如下:
2. 增加负载均衡能力
1. 注入bean
@Bean @LoadBalanced public RestTemplate getRestTemplate() { return new RestTemplate(); }
2. 测试:
private static final String PAYMENT_URL = "http://CLOUD-PAYMENT-SERVICE"; @GetMapping("/pay/getServerPort") public JSONResultUtil<String> getServerPort() { return restTemplate.getForObject(PAYMENT_URL + "/pay/getServerPort", JSONResultUtil.class); }
3. 查看其调用链:
上面增加@LoadBalanced 注解之后,给RestTemplate 增加了org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor 拦截器, 会走ribbon 进行服务负载均衡的机制。
可以看到最终经过ribbon 的Inteceptor 之后还是会交给Spring的web.http.client 相关的API进行处理。
列出几个重要的方法:
1》 org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute 有拦截器的情况进行拦截器拦截
@Override public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); return nextInterceptor.intercept(request, body, this); } else { HttpMethod method = request.getMethod(); Assert.state(method != null, "No standard HTTP method"); ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value)); if (body.length > 0) { if (delegate instanceof StreamingHttpOutputMessage) { StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); } else { StreamUtils.copy(body, delegate.getBody()); } } return delegate.execute(); } }
2》 调用到 org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor#intercept:
@Override public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution)); }
3》 调用到org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient#execute(java.lang.String, org.springframework.cloud.client.loadbalancer.LoadBalancerRequest<T>, java.lang.Object):
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); return execute(serviceId, ribbonServer, request); } @Override public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; }
4》. 继续调用到: org.springframework.cloud.client.loadbalancer.LoadBalancerRequestFactory#createRequest
public LoadBalancerRequest<ClientHttpResponse> createRequest( final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) { return instance -> { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer); if (this.transformers != null) { for (LoadBalancerRequestTransformer transformer : this.transformers) { serviceRequest = transformer.transformRequest(serviceRequest, instance); } } return execution.execute(serviceRequest, body); }; }
5》 调用到org.springframework.http.client.InterceptingClientHttpRequest.InterceptingRequestExecution#execute 开始继续上面流程,直到拦截器走完会走原来调用 spring.web.http.client 的相关API发送请求。
2. Ribbon
Ribbon是Netflix Ribbon实现的一套客户端负载均衡的工具。主要提供客户端的软件负载均衡和服务调用。核心是提供负载均衡能力, 也就是有从注册中心获取服务信息、然后根据服务ServiceId 进行负载均衡的能力。
Ribbon客户端提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出LoadBalancer(简称LB)后面所有的机器,Ribbon会基于某种规则(轮询、随机连接)等去连接这些机器。也可以实现自定义的负载均衡算法。
Ribbon 一个重要的类是com.netflix.loadbalancer.BaseLoadBalancer。 这个类里面维护了相关线上的Server 对象, 进行负载均衡的时候从这里获取到服务用自己的负载均衡算法进行选择服务后继续后面的操作。
所以Ribbon 相当于是在原来RestTemplate 的功能上加了个Inteceptor, 使其具有负载均衡以及根据服务名称进行调用的能力。最终根据选择的服务,替换掉服务名称之后交给原来的spring.web.http.client 封装的相关API进行调用。
1. Ribbon 默认也是从注册中心 获取的服务,并且通过自己的定时任务以及心跳检测来判断服务的状态。
1》 在创建com.netflix.loadbalancer.ZoneAwareLoadBalancer#ZoneAwareLoadBalancer(com.netflix.client.config.IClientConfig, com.netflix.loadbalancer.IRule, com.netflix.loadbalancer.IPing, com.netflix.loadbalancer.ServerList<T>, com.netflix.loadbalancer.ServerListFilter<T>, com.netflix.loadbalancer.ServerListUpdater) 对象的时候会从Eureka 获取。其调用链如下:
最终到: com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList#obtainServersViaDiscovery 获取服务
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }
2》 同时,在一个定时任务中更新服务列表与检测服务状态:
com.netflix.loadbalancer.PollingServerListUpdater#start 开启定时任务更新内部Server
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
UpdateAction 如下: com.netflix.loadbalancer.DynamicServerListLoadBalancer#DynamicServerListLoadBalancer(com.netflix.client.config.IClientConfig)
public DynamicServerListLoadBalancer(IClientConfig clientConfig) { this.isSecure = false; this.useTunnel = false; this.serverListUpdateInProgress = new AtomicBoolean(false); class NamelessClass_1 implements UpdateAction { NamelessClass_1() { } public void doUpdate() { DynamicServerListLoadBalancer.this.updateListOfServers(); } } this.updateAction = new NamelessClass_1(); this.initWithNiwsConfig(clientConfig); }
调用到: com.netflix.loadbalancer.DynamicServerListLoadBalancer#updateListOfServers
public void updateListOfServers() { List<T> servers = new ArrayList(); if (this.serverListImpl != null) { servers = this.serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers); if (this.filter != null) { servers = this.filter.getFilteredListOfServers((List)servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", this.getIdentifier(), servers); } } this.updateAllServerList((List)servers); }
这里的 serverListImpl 也是个策略模式,适配不同的注册中心:
调用到:org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList#getUpdatedListOfServers
@Override public List<DiscoveryEnabledServer> getUpdatedListOfServers() { List<DiscoveryEnabledServer> servers = setZones( this.list.getUpdatedListOfServers()); return servers; }
调用到: com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList#getUpdatedListOfServers
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }
3. OpenFeign
OpenFeign 我理解相当于一个RPC 远程调用的组件, 只需要声明接口,然后通过生成JDK代理对象即可用接口发起http 的形式发起调用。
原理参考:https://www.cnblogs.com/qlqwjy/p/14568086.html
OpenFeign 可以结合负载均衡使用,也可以单独作为远程调用工具进行调用。单独使用的时候时可以使用自己的Client(简单的封装了下HttpURLConnection), 也可以使用一些第三方连接池作为HttpURLConnection 调用的组件。 如果使用服务名称进行调用,可以使用ribbon提供的负载均衡能力进行负载均衡以及超时控制等操作。
1. 不使用负载均衡
1. 接口
package cn.qz.cloud.service; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.web.bind.annotation.GetMapping; @FeignClient(url = "http://news.baidu.com/", name = "TestFeignService") public interface TestFeignService { @GetMapping("guonei") String guoji(); }
2. 测试:
@Autowired private TestFeignService testFeignService; @GetMapping("guoji") public String test2() { return testFeignService.guoji(); }
3. 断点查看: 断点打到java.net.HttpURLConnection#HttpURLConnection, 查看调用链
可以看到最后也是走的HttpURLConnection, 只不过在前面进行了一系列的代理操作。 重要方法如下:
1》 feign.ReflectiveFeign.FeignInvocationHandler#invoke feign 生成的代理接口的调用入口
static class FeignInvocationHandler implements InvocationHandler { private final Target target; private final Map<Method, MethodHandler> dispatch; FeignInvocationHandler(Target target, Map<Method, MethodHandler> dispatch) { this.target = (Target)Util.checkNotNull(target, "target", new Object[0]); this.dispatch = (Map)Util.checkNotNull(dispatch, "dispatch for %s", new Object[]{target}); } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (!"equals".equals(method.getName())) { if ("hashCode".equals(method.getName())) { return this.hashCode(); } else { return "toString".equals(method.getName()) ? this.toString() : ((MethodHandler)this.dispatch.get(method)).invoke(args); } } else { try { Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null; return this.equals(otherHandler); } catch (IllegalArgumentException var5) { return false; } } } public boolean equals(Object obj) { if (obj instanceof ReflectiveFeign.FeignInvocationHandler) { ReflectiveFeign.FeignInvocationHandler other = (ReflectiveFeign.FeignInvocationHandler)obj; return this.target.equals(other.target); } else { return false; } } public int hashCode() { return this.target.hashCode(); } public String toString() { return this.target.toString(); } }
2》 继续调用到:feign.SynchronousMethodHandler#invoke
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package feign; import feign.InvocationHandlerFactory.MethodHandler; import feign.Logger.Level; import feign.Request.Options; import feign.codec.DecodeException; import feign.codec.Decoder; import feign.codec.ErrorDecoder; import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; final class SynchronousMethodHandler implements MethodHandler { private static final long MAX_RESPONSE_BUFFER_SIZE = 8192L; private final MethodMetadata metadata; private final Target<?> target; private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Level logLevel; private final feign.RequestTemplate.Factory buildTemplateFromArgs; private final Options options; private final Decoder decoder; private final ErrorDecoder errorDecoder; private final boolean decode404; private final boolean closeAfterDecode; private final ExceptionPropagationPolicy propagationPolicy; private SynchronousMethodHandler(Target<?> target, Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Level logLevel, MethodMetadata metadata, feign.RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy) { this.target = (Target)Util.checkNotNull(target, "target", new Object[0]); this.client = (Client)Util.checkNotNull(client, "client for %s", new Object[]{target}); this.retryer = (Retryer)Util.checkNotNull(retryer, "retryer for %s", new Object[]{target}); this.requestInterceptors = (List)Util.checkNotNull(requestInterceptors, "requestInterceptors for %s", new Object[]{target}); this.logger = (Logger)Util.checkNotNull(logger, "logger for %s", new Object[]{target}); this.logLevel = (Level)Util.checkNotNull(logLevel, "logLevel for %s", new Object[]{target}); this.metadata = (MethodMetadata)Util.checkNotNull(metadata, "metadata for %s", new Object[]{target}); this.buildTemplateFromArgs = (feign.RequestTemplate.Factory)Util.checkNotNull(buildTemplateFromArgs, "metadata for %s", new Object[]{target}); this.options = (Options)Util.checkNotNull(options, "options for %s", new Object[]{target}); this.errorDecoder = (ErrorDecoder)Util.checkNotNull(errorDecoder, "errorDecoder for %s", new Object[]{target}); this.decoder = (Decoder)Util.checkNotNull(decoder, "decoder for %s", new Object[]{target}); this.decode404 = decode404; this.closeAfterDecode = closeAfterDecode; this.propagationPolicy = propagationPolicy; } public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = this.buildTemplateFromArgs.create(argv); Options options = this.findOptions(argv); Retryer retryer = this.retryer.clone(); while(true) { try { return this.executeAndDecode(template, options); } catch (RetryableException var9) { RetryableException e = var9; try { retryer.continueOrPropagate(e); } catch (RetryableException var8) { Throwable cause = var8.getCause(); if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) { throw cause; } throw var8; } if (this.logLevel != Level.NONE) { this.logger.logRetry(this.metadata.configKey(), this.logLevel); } } } } Object executeAndDecode(RequestTemplate template, Options options) throws Throwable { Request request = this.targetRequest(template); if (this.logLevel != Level.NONE) { this.logger.logRequest(this.metadata.configKey(), this.logLevel, request); } long start = System.nanoTime(); Response response; try { response = this.client.execute(request, options); } catch (IOException var16) { if (this.logLevel != Level.NONE) { this.logger.logIOException(this.metadata.configKey(), this.logLevel, var16, this.elapsedTime(start)); } throw FeignException.errorExecuting(request, var16); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); boolean shouldClose = true; try { if (this.logLevel != Level.NONE) { response = this.logger.logAndRebufferResponse(this.metadata.configKey(), this.logLevel, response, elapsedTime); } if (Response.class == this.metadata.returnType()) { Response var19; if (response.body() == null) { var19 = response; return var19; } else if (response.body().length() != null && (long)response.body().length() <= 8192L) { byte[] bodyData = Util.toByteArray(response.body().asInputStream()); Response var21 = response.toBuilder().body(bodyData).build(); return var21; } else { shouldClose = false; var19 = response; return var19; } } else { Object result; Object var11; if (response.status() >= 200 && response.status() < 300) { if (Void.TYPE != this.metadata.returnType()) { result = this.decode(response); shouldClose = this.closeAfterDecode; var11 = result; return var11; } else { result = null; return result; } } else if (this.decode404 && response.status() == 404 && Void.TYPE != this.metadata.returnType()) { result = this.decode(response); shouldClose = this.closeAfterDecode; var11 = result; return var11; } else { throw this.errorDecoder.decode(this.metadata.configKey(), response); } } } catch (IOException var17) { if (this.logLevel != Level.NONE) { this.logger.logIOException(this.metadata.configKey(), this.logLevel, var17, elapsedTime); } throw FeignException.errorReading(request, response, var17); } finally { if (shouldClose) { Util.ensureClosed(response.body()); } } } long elapsedTime(long start) { return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); } Request targetRequest(RequestTemplate template) { Iterator var2 = this.requestInterceptors.iterator(); while(var2.hasNext()) { RequestInterceptor interceptor = (RequestInterceptor)var2.next(); interceptor.apply(template); } return this.target.apply(template); } Object decode(Response response) throws Throwable { try { return this.decoder.decode(response, this.metadata.returnType()); } catch (FeignException var3) { throw var3; } catch (RuntimeException var4) { throw new DecodeException(response.status(), var4.getMessage(), response.request(), var4); } } Options findOptions(Object[] argv) { return argv != null && argv.length != 0 ? (Options)Stream.of(argv).filter((o) -> { return o instanceof Options; }).findFirst().orElse(this.options) : this.options; } static class Factory { private final Client client; private final Retryer retryer; private final List<RequestInterceptor> requestInterceptors; private final Logger logger; private final Level logLevel; private final boolean decode404; private final boolean closeAfterDecode; private final ExceptionPropagationPolicy propagationPolicy; Factory(Client client, Retryer retryer, List<RequestInterceptor> requestInterceptors, Logger logger, Level logLevel, boolean decode404, boolean closeAfterDecode, ExceptionPropagationPolicy propagationPolicy) { this.client = (Client)Util.checkNotNull(client, "client", new Object[0]); this.retryer = (Retryer)Util.checkNotNull(retryer, "retryer", new Object[0]); this.requestInterceptors = (List)Util.checkNotNull(requestInterceptors, "requestInterceptors", new Object[0]); this.logger = (Logger)Util.checkNotNull(logger, "logger", new Object[0]); this.logLevel = (Level)Util.checkNotNull(logLevel, "logLevel", new Object[0]); this.decode404 = decode404; this.closeAfterDecode = closeAfterDecode; this.propagationPolicy = propagationPolicy; } public MethodHandler create(Target<?> target, MethodMetadata md, feign.RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder) { return new SynchronousMethodHandler(target, this.client, this.retryer, this.requestInterceptors, this.logger, this.logLevel, md, buildTemplateFromArgs, options, decoder, errorDecoder, this.decode404, this.closeAfterDecode, this.propagationPolicy); } } }