前言
OkHttp是一个处理网络请求的开源项目,是Android端最火热的轻量级框架,由移动支付Square公司贡献用于替代HttpUrlConnection和Apache HttpClient。
•支持HTTPS/HTTP2/WebSocket(在OkHttp3.7中已经剥离对Spdy的支持,转而大力支持HTTP2)
•内部维护任务队列线程池,友好支持并发访问
•内部维护连接池,支持多路复用,减少连接创建开销
•socket创建支持最佳路由
•提供拦截器链(InterceptorChain),实现request与response的分层处理(如透明GZIP压缩,logging等)
Eclipse or Android Studio:
https://square.github.io/okhttp/
https://github.com/square/okhttp
注意:
okhttp内部依赖okio,别忘了同时导入okio。
okhttp回调处理在子线程,不在UI线程,注意使用Handler + Message处理UI相关操作。
OkHttp执行过程:
1.创建OkHttpClient对象。OkHttpClient为网络请求执行的一个中心,它会管理连接池,缓存,SocketFactory,代理,各种超时时间,DNS,请求执行结果的分发等许多内容。
2.创建Request对象。Request用于描述一个HTTP请求,请求方法GET、HEAD、POST、DELETE、PUT、PATCH,请求的URL,请求的header,请求的body,请求的缓存策略等。
3.利用前面创建的OkHttpClient对象和Request对象创建Call对象。Call是一次HTTP请求的Task,它会执行网络请求以获得响应。OkHttp中的网络请求执行Call既可以同步进行,也可以异步进行。调用call.execute()将直接执行网络请求,阻塞直到获得响应。而调用call.enqueue()传入回调,则会将Call放入一个异步执行队列,由ExecutorService在后台执行。
4.执行网络请求并获取响应。
一.OkHttp整体架构
OkHttp网络请求流程图
1.1.整体架构
一.OkHttp整体架构
OkHttp网络请求流程图
1.1.整体架构
上图是OkHttp的总体架构,大致可以分为以下几层:
•Interface——接口层:接受网络访问请求
•Protocol——协议层:处理协议逻辑
•Connection——连接层:管理网络连接,发送新的请求,接收服务器响应
•Cache——缓存层:管理本地缓存
•I/O——I/O层:实际数据读写实现
•Inteceptor——拦截器层:拦截网络访问,插入拦截逻辑
上图是OkHttp的总体架构,大致可以分为以下几层:
•Interface——接口层:接受网络访问请求
•Protocol——协议层:处理协议逻辑
•Connection——连接层:管理网络连接,发送新的请求,接收服务器响应
•Cache——缓存层:管理本地缓存
•I/O——I/O层:实际数据读写实现
•Inteceptor——拦截器层:拦截网络访问,插入拦截逻辑
1.2.Interface —— 接口层
接口层接收用户的网络访问请求(同步请求/异步请求),发起实际的网络访问。
1.2.1.OkHttpClient
OkHttp框架的网络客户端,更确切的说是一个用户面板。用户使用OkHttp进行各种设置,发起各种网络请求都是通过 OkHttpClient 完成的。每个 OkHttpClient 内部都维护了属于自己的任务队列,连接池,Cache,拦截器链等,所以在使用OkHttp作为网络框架时应该全局共享一个 OkHttpClient 实例。
1.2.2.Request
Request用于描述一个HTTP请求,请求方法GET、HEAD、POST、DELETE、PUT、PATCH,请求的URL,请求的header,请求的body,请求的TAG,请求的缓存策略等。
1.2.3.Call
Call
Call是一次HTTP请求的Task,它会执行网络请求以获得响应。
每一个网络请求都是一个 Call 实例。 Call 本身只是一个接口,定义了 Call 的接口方法,实际执行过程中,OkHttp会为每一个请求创建一个 RealCall。
OkHttp中的网络请求执行Call既可以同步进行,也可以异步进行。调用call.execute()将直接执行网络请求,阻塞直到获得响应。而调用call.enqueue()传入回调,则会将Call放入一个异步执行队列,由ExecutorService在后台执行。
public interface Call extends Cloneable {
/** Returns the original request that initiated this call. */
Request request();
Response execute() throws IOException;
void enqueue(Callback responseCallback);
/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */
void cancel();
/**
* Returns true if this call has been either {@linkplain #execute() executed} or {@linkplain
* #enqueue(Callback) enqueued}. It is an error to execute a call more than once.
*/
boolean isExecuted();
boolean isCanceled();
interface Factory {
Call newCall(Request request);
}
}
OkHttpClient实现了Call.Factory接口,通过接口方法OkHttpClient.newCall()可以看到具体使用的是RealCall类:
/**
* Prepares the {@code request} to be executed at some point in the future.
*/
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
每一个 RealCall 内部有一个 AsyncCall :
AsyncCall(异步请求):
final class AsyncCall extends NamedRunnable {
/** * Callback */ private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } // ......
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } }
AsyncCall其实就做了三件事情:
(1)持有并维护一个回调接口Callback,在适当的时候回调适当的方法。
(2)调用Response response = getResponseWithInterceptorChain();获得response,并判断该请求是否被cancel,之后通过Callback回调,把结果反馈给用户。
(3)无论执行结果如何,finally:通知dispatcher,注销当前Call,即把当前Call从相应队列中移除。
AsyncCall 继承的 NamedRunnable 继承自Runnable接口:
public abstract class NamedRunnable implements Runnable {
// ......
@Override public final void run() {
// ......
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}
run 方法中执行了 AsyncTask 的 execute()方法,所以每一个AsyncTask就是一个线程,而执行AsyncTask的过程就是执行其 execute 方法的过程。
RealCall
再回到OkHttpClient:在OkHttp中使用了RealCall来执行整个Http请求。RealCall把请求分成了同步,异步,同步。
同步请求:
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// ......
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
// ......
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
同步请求做了四件事情:
(1)加锁置标志位。
(2)调用client.dispatcher().executed(this)向client的dispatcher注册当前Call,dispatcher把call加入到了同步队列中,等待执行。
(3)调用getResponseWithInterceptorChain()执行网络请求并获得响应。
(4)调用client.dispatcher().finished(this)向client的dispatcher注销当前Call,即从同步队列中删除。
异步请求
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// ......
client.dispatcher().enqueue(new RealCall.AsyncCall(responseCallback));
}
RealCall的异步请求只做了两件事情:
(1)加锁置标志位。
(2)创建AsyncCall并将之丢给client的dispatcher,dispatcher把扔过来的AsyncCall(独立线程)加入到异步执行队列中,交给executorService执行。executorService其实是一个ThreadPoolExecutor。
总结就是,异步请求就是把执行操作打包成独立线程(AsyncCall),交给线程池管理并执行了。
RealCall.getResponseWithInterceptorChain()
无论同步,异步,最终执行网络请求,获取网络相应的操作,都是通过调用
RealCall.getResponseWithInterceptorChain()
实现的。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
这里每一行代码都很重要,创建了一个Interceptor的列表,把用户自定义拦截器集合,系统内置拦截器按一定顺序:
用户普通拦截器 —>
RetryAndFollowUpInterceptor —>
BridgeInterceptorr —>
CacheInterceptor—>
ConnectInterceptor —>
用户网络拦截器(networkInterceptors)—>
CallServerInterceptor
添加到这个新的拦截器列表中,继而创建了一个Interceptor.Chain对象来处理请求并获得响应。
RealInterceptorChain中,在RealInterceptorChain.proceed()中,除了对状态及获取的reponse做检查之外,最主要的事情即是构造新的RealInterceptorChain对象,获取对应Interceptor,并调用Interceptor的intercept(next)了。在这里,index充当迭代器或指示器的角色,用于指出当前正在处理的Interceptor。
RealInterceptorChain + Interceptor实现了装饰器模式,整个过程由RealInterceptorChain.proceed() 和interceptor.intercept(next)协作执行,实现了请求/响应的串式或流式处理。只不过内层装饰器不是外层装饰器的成员变量,而是接口方法中创建的临时变量。
1.2.4.Dispatcher
dispatcher分发器类似于Ngnix中的反向代理,通过Dispatcher将任务分发到合适的空闲线程,实现非阻塞,高可用,高并发。
职责定义
Dispatcher是OkHttp的任务分发器,其内部维护了三个任务队列(readyAsyncCalls,runningAsyncCalls,runningSyncCalls)一个线程池(executorService),当有接收到一个 Call 时, Dispatcher 负责在线程池中找到空闲的线程并执行其 execute 方法。
readyAsyncCalls:待执行异步任务队列(ArrayDeque可变数组默认大小16)
runningAsyncCalls:运行中异步任务队列(ArrayDeque可变数组默认大小16)
runningSyncCalls:运行中同步任务队列(ArrayDeque可变数组默认大小16)
executorService:任务队列线程池(无限制大小):
用户可以通过Dispatcher的构造函数来定制ExecutorService,这需要通过OkHttpClient.Builder在OkHttpClient的构建过程中间接的做到。
执行策略:
同步:
在Call的同步执行过程中,调用client.dispatcher().executed(this)向client的dispatcher注册当前Call,Dispatcher仅仅是将Call放进了runningSyncCalls,其它便什么也没做,该操作主要目的是方便全局性的cancel所有的Call。
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
异步:
enqueue(AsyncCall call) AsyncCall是被放在一个ExecutorService中执行的。默认情况下,这是一个不限容量的线程池。但Dispatcher会限制每个host同时执行的最大请求数量,默认为5,同时也会限制同时执行的总的最大请求数量。runningAsyncCalls中保存所有正在被ExecutorService执行的AsyncCall,而readyAsyncCalls则用于存放由于对单个host同时执行的最大请求数量的限制(5),或总的同时执行最大请求数量的限制(64),而暂时得不到执行的AsyncCall。
synchronized void enqueue(RealCall.AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
如果满足条件:
(1)当前请求数小于最大请求数(64)
(2)对单一host的请求小于阈值(5)
将该任务插入正在执行任务队列,并执行对应任务。如果不满足则将其放入待执行队列。
finished()
当请求完成,无论成功与否,都会调用finished()方法。finished()做了以下几件事情:
(1)将执行结束的AsyncCall从runningAsyncCalls移除
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
(2)空闲出多余线程,调用promoteCalls调用待执行的任务
检查当前正在执行的请求数是否超过同时执行最大请求数量限制(默认64)
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
是则返回;否则从等待队列中循环取出任务,同时检查该任务所用端口所属的正在执行的请求数是否大于OkHttp对于同一端口最大请求数的限制(默认5)。
如果以上条件均满足,把取出的Call扔给线程池执行:
runningAsyncCalls.add(call);
executorService().execute(call);
(3)如果当前整个线程池都空闲下来,执行空闲通知回调线程(idleCallback)
1.3.Protocol —— 协议层
Protocol层负责处理协议逻辑,OkHttp支持Http1/Http2/WebSocket协议,并在3.7版本中放弃了对Spdy协议,鼓励开发者使用Http2。
1.4.Connection —— 连接层:管理网络连接,发送新的请求,接收服务器返回
连接层顾名思义就是负责网络连接。在连接层中有一个连接池,统一管理所有的Socket连接,当用户新发起一个网络请求时,OkHttp会首先从连接池中查找是否有符合要求的连接,如果有则直接通过该连接发送网络请求;否则新创建一个网络连接。
RealConnection 描述一个物理Socket连接,连接池中维护多个RealConnection实例。由于Http2支持多路复用,一个 RealConnection 可以支持多个网络访问请求,所以OkHttp又引入了 StreamAllocation 来描述一个实际的网络请求开销(从逻辑上一个 Stream 对应一个 Call ,但在实际网络请求过程中一个 Call 常常涉及到多次请求。如重定向,Authenticate等场景。所以准确地说,一个 Stream 对应一次请求,而一个 Call 对应一组有逻辑关联的 Stream ),一个 RealConnection 对应一个或多个 StreamAllocation ,所以 StreamAllocation 可以看做是 RealConenction 的计数器,当 RealConnection 的引用计数变为0,且长时间没有被其他请求重新占用就将被释放。
1.5.Cache —— 缓存层
Cache层负责维护请求缓存,当用户的网络请求在本地已有符合要求的缓存时,OkHttp会直接从缓存中返回结果,从而节省网络开销。
1.6.I/O —— I/O层
I/O层负责实际的数据读写。OkHttp的另一大优点就是其高效的I/O操作,这归因于其高效的I/O库okio
1.7.Inteceptor —— 拦截器层
拦截器层提供了一个类AOP接口,方便用户可以切入到各个层面对网络访问进行拦截并执行相关逻辑。
二.OkHttp拦截器
关于addInterceptor和addNetworkInterceptor的区别,okHttp官方对拦截器做了解释,并给了一张图,一目了然:
在RealCall的 代码中,用户自定义应用拦截器,用户自定义网络拦截器,OkHttp内置拦截器组成了一个拦截器List,三种拦截器在List的位置也决定了其在请求中的调用顺序:
用户普通拦截器 —>
RetryAndFollowUpInterceptor —>
BridgeInterceptorr —>
CacheInterceptor—>
ConnectInterceptor —>
用户网络拦截器(networkInterceptors)—>
CallServerInterceptor
应用拦截器(Application Interceptors)较上层,而网络拦截器(Network Interceptors)较底层,所有拦截器就是一个由浅入深的递归调用。而两者不同在于其中对Request,Response的加工顺序!
@Override
public Response intercept(Chain chain) throws IOException
2.1.原理解析
Call的执行过程详见1.2 Interface —— 接口层
拦截器的代码入口在getResponseWithInterceptorChain:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
其逻辑大致分为两部分:
(1)组织创建拦截器列表(List)
(2)创建一个拦截器链 RealInterceptorChain,并执行拦截器链的 proceed 方法
接下来看下 RealInterceptorChain 的实现逻辑:
@Override public Response proceed(Request request) throws IOException {
return proceed(request, streamAllocation, httpCodec, connection);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
// ......
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// ......
return response;
}
(1)创建下一个拦截链next。传入 index + 1 使得下一个拦截器链的初始位置从下一个拦截器开始访问
(2)执行索引为 index 的拦截器的intercept方法,并将下一个拦截器链传入该方法
在Interceptor实现类的intercept方法中,会继续调用chain.proceed(request)
这行代码最关键,就是执行下一个拦截器链的proceed方法。而在下一个拦截器链中又会执行下一个拦截器的intercept方法。所以整个执行链就在拦截器与拦截器链中交替执行,最终完成所有拦截器的操作。这也是OkHttp拦截器的链式执行逻辑。而一个拦截器的intercept方法所执行的核心逻辑大致分为三部分:
•在发起请求前对request进行处理
•调用下一个拦截器,获取response
•对response进行处理,返回给上一个拦截器
这就是OkHttp拦截器机制的核心逻辑。所以一个网络请求实际上就是一个个拦截器执行其intercept方法的过程。
2.2.RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor负责两部分逻辑:
(1)在网络请求失败后进行重试
(2)当服务器返回当前请求需要进行重定向时直接发起新的请求,并在条件允许情况下复用当前连接
执行流程:
1.实例化StreamAllocation,初始化一个Socket连接对象,获取到输入/输出流()基于Okio
2.开启循环,执行下一个调用链(拦截器),等待返回结果(Response)
3.如果发生错误,判断是否继续请求,否:退出
4.检查响应是否符合要求,是:返回
5.关闭响应结果
6.判断是否达到最大重连次数(默认20),是:退出
7.检查是否有相同连接,是:释放,重建连接
8.重复流程
2.3.BridgeInterceptor
BridgeInterceptor在request阶段对请求头添加一些字段,在response阶段对响应进行一些gzip解压操作:
•设置内容长度,内容编码
•设置gzip压缩,并在接收到内容后进行解压。省去了应用层处理数据解压的麻烦
•添加cookie
•设置其他报头,如 User-Agent , Host , Keep-alive 等。其中 Keep-Alive 是实现多路复用的必要步骤
2.4.CacheInterceptor
CacheInterceptor的职责很明确,就是负责Cache的管理,CacheInterceptor负责在request阶段判断是否有缓存,是否需要重新请求。在response阶段负责把response缓存起来。
1. 读取默认缓存(创建Client时可配置)
2. 创建缓存策略(根据头信息,判断强制缓存、对比缓存等策略)
3. 根据策略,不需要网络请求且没有缓存时构建一个状态码为504的Response,提示错误
4. 根据策略,不需要网络请求但存在缓存时,将缓存返回
5. 前面步骤都没返回,则从网络中读取(推进到链中的下一个结点继续执行)
6. 获取到网络返回的networkResponse,如果状态码为304(Not Modified),说明缓存可用,直接返回缓存结果
7. 如果步骤6没有返回,则说明缓存已过期,则根据网络中返回的networkResponse构建response
8. 将网络中返回的Response缓存
9. 返回response
2.5.ConnectInterceptor
关键代码:RealConnection connection = streamAllocation.connection();
即为当前请求找到合适的连接,可能复用已有连接也可能是重新创建的连接,返回的连接由连接池负责决定。借助于前面分配的StreamAllocation对象建立与服务器之间的连接,并选定交互所用的协议是HTTP 1.1还是HTTP 2。对后续Interceptor的执行的影响:创建了httpStream和connection。
2.6.CallServerInterceptor
CallServerInterceptor负责向服务器发起真正的访问请求,并在接收到服务器返回后读取响应返回。
处理IO,与服务器进行数据交换。对后续Interceptor的执行的影响:为Interceptor链中的最后一个Interceptor,没有后续Interceptor。
2.7.拦截器List执行流程
递归拦截器链实现完整网络访问的流程图:
2.8.自定义拦截器
2.8.1.RxCacheInterceptor
public class RxCacheInterceptor implements Interceptor {
private RxCacheInterceptor() {} @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); if (!NetworkUtil.isNetworkAvailable(RxRetrofitApp.getApplication())) { /* new request */ Request.Builder requestBuilder = request.newBuilder() .method(request.method(), request.body()); /* force from cache when net work is unavailable */ Request newRequest = requestBuilder .cacheControl(CacheControl.FORCE_CACHE) .build(); /* 6 hours available */ int maxStale = 60 * 60 * 6; return chain.proceed(newRequest).newBuilder() .removeHeader("Pragma") .removeHeader("Cache-Control") .header("Cache-Control", "public, only-if-cached, max-stale=" + maxStale) .build(); } else { Response oldResponse = chain.proceed(request); /* Cache-Time is the params user-defined from header, value is the time available */ String cacheTime = request.header("Cache-Time"); if (null != cacheTime) { return oldResponse.newBuilder() .removeHeader("Pragma") .removeHeader("Cache-Control") /* cache for cache seconds */ .header("Cache-Control", "max-age=" + cacheTime) .build(); } else { return oldResponse; } } } public static class Builder { RxCacheInterceptor mHttpCommonInterceptor; public Builder() { mHttpCommonInterceptor = new RxCacheInterceptor(); } public RxCacheInterceptor build() { return mHttpCommonInterceptor; } } }
2.8.2.BaseParamsInterceptor
public abstract class BaseParamsInterceptor implements Interceptor {
public enum Type { ADD, UPDATE, REMOVE } public Type control; public Type getControlType() { return control; } public void setControlType(Type control) { this.control = control; } abstract Request interceptor(Request request) throws UnsupportedEncodingException; }
2.8.3.RxCommonHeaderInterceptor
public class RxCommonHeaderInterceptor extends BaseParamsInterceptor {
private Map<String, String> mHeaderParamsMap = new HashMap<>(); private RxCommonHeaderInterceptor() { super.control = Type.ADD; } private RxCommonHeaderInterceptor(Type t) { super.control = t; } /** * @param key key * @param value value */ private void addHeaderParams(String key, String value) { if (mHeaderParamsMap.containsKey(key)) { mHeaderParamsMap.remove(key); } mHeaderParamsMap.put(key, value); } /** * @param headerParamsMap headerParamsMap */ public void addHeaderParams(Map<String, String> headerParamsMap) { for (Map.Entry<String, String> params : headerParamsMap.entrySet()) { addHeaderParams(params.getKey(), params.getValue()); } } @Override public Response intercept(Chain chain) throws IOException { return chain.proceed(interceptor(chain.request())); } @Override Request interceptor(Request request) throws UnsupportedEncodingException { // 新的请求 Request.Builder requestBuilder = request.newBuilder(); requestBuilder.method(request.method(), request.body()); // 添加公共参数,添加到header中 if (mHeaderParamsMap.size() > 0) { for (Map.Entry<String, String> headParams : mHeaderParamsMap.entrySet()) { switch (control) { case ADD: requestBuilder.addHeader(headParams.getKey(), headParams.getValue() == null ? "" : getValueEncoded((String) headParams.getValue())); break; case UPDATE: requestBuilder.header(headParams.getKey(), headParams.getValue() == null ? "" : getValueEncoded((String) headParams.getValue())); break; case REMOVE: break; default: break; } } } return requestBuilder.build(); } /** * @param value * @return * @throws UnsupportedEncodingException */ private static String getValueEncoded(String value) throws UnsupportedEncodingException { if (value == null) { return "null"; } String newValue = value.replace("\n", ""); for (int i = 0, length = newValue.length(); i < length; i++) { char c = newValue.charAt(i); if (c <= '\u001f' || c >= '\u007f') { return URLEncoder.encode(newValue, "UTF-8"); } } return newValue; } public static class Builder { RxCommonHeaderInterceptor mHttpCommonInterceptor; public Builder() { mHttpCommonInterceptor = new RxCommonHeaderInterceptor(); } public Builder(Type t) { mHttpCommonInterceptor = new RxCommonHeaderInterceptor(t); } public Type getControlType() { if (null != mHttpCommonInterceptor) { return mHttpCommonInterceptor.getControlType(); } return Type.ADD; } public void setControlType(Type t) { if (null != mHttpCommonInterceptor) { mHttpCommonInterceptor.setControlType(t); } } public Builder addHeaderParams(String key, String value) { mHttpCommonInterceptor.mHeaderParamsMap.put(key, value); return this; } public Builder addHeaderParams(String key, int value) { return addHeaderParams(key, String.valueOf(value)); } public Builder addHeaderParams(String key, float value) { return addHeaderParams(key, String.valueOf(value)); } public Builder addHeaderParams(String key, long value) { return addHeaderParams(key, String.valueOf(value)); } public Builder addHeaderParams(String key, double value) { return addHeaderParams(key, String.valueOf(value)); } public Builder addHeaderParamsMap(Map<String, String> mHeaderParamsMap) { mHttpCommonInterceptor.mHeaderParamsMap.putAll(mHeaderParamsMap); return this; } public RxCommonHeaderInterceptor build() { return mHttpCommonInterceptor; } } }
2.8.4.RxCommonParamsInterceptor
public class RxCommonParamsInterceptor extends BaseParamsInterceptor {
private Map<String, String> mParamsMap = new HashMap<>(); private RxCommonParamsInterceptor() { super.control = Type.ADD; } private RxCommonParamsInterceptor(Type t) { super.control = t; } /** * @param key key * @param value value */ private void addParams(String key, String value) { if (mParamsMap.containsKey(key)) { mParamsMap.remove(key); } mParamsMap.put(key, value); } /** * @param paramsMap mParamsMap */ public void addParams(Map<String, String> paramsMap) { for (Map.Entry<String, String> params : paramsMap.entrySet()) { addParams(params.getKey(), params.getValue()); } } @Override public Response intercept(Chain chain) throws IOException { return chain.proceed(interceptor(chain.request())); } @Override Request interceptor(Request request) throws UnsupportedEncodingException { // 添加公共参数 if (null != mParamsMap && mParamsMap.size() > 0) { if (("GET").equals(request.method())) { // GET HttpUrl httpUrl = request.url(); HttpUrl.Builder requestUrlBuilder = httpUrl.newBuilder(); for (Map.Entry<String, String> params : mParamsMap.entrySet()) { switch (super.control) { case ADD: requestUrlBuilder.addQueryParameter(params.getKey(), params.getValue() == null ? "" : params.getValue()).build(); break; case UPDATE: requestUrlBuilder.setQueryParameter(params.getKey(), params.getValue() == null ? "" : params.getValue()).build(); break; case REMOVE: requestUrlBuilder.removeAllQueryParameters(params.getKey()).build(); break; default: break; } } return request.newBuilder().url(requestUrlBuilder.build()).build(); } else if ("POST".equals(request.method())) { // POST RequestBody requestBody = request.body(); if (requestBody != null && requestBody instanceof FormBody) { // POST Param x-www-form-urlencoded FormBody.Builder bodyBuilder = new FormBody.Builder(); FormBody formBody = (FormBody) requestBody; int bodySize = formBody.size(); for (int i = 0; i < bodySize; i++) { bodyBuilder.addEncoded(formBody.encodedName(i), formBody.encodedValue(i)); } for (Map.Entry<String, String> entry : mParamsMap.entrySet()) { bodyBuilder.addEncoded(entry.getKey(), getValueEncoded(entry.getValue())); } return request.newBuilder().post(bodyBuilder.build()).build(); } else if (requestBody != null && requestBody instanceof MultipartBody) { // POST Param form-data MultipartBody.Builder newBuilder = new MultipartBody.Builder().setType(MultipartBody.FORM); for (Map.Entry<String, String> entry : mParamsMap.entrySet()) { newBuilder.addFormDataPart(entry.getKey(), entry.getValue()); } MultipartBody multipartBody = (MultipartBody) requestBody; for (MultipartBody.Part part : multipartBody.parts()) { newBuilder.addPart(part); } return request.newBuilder().post(newBuilder.build()).build(); } else { return request; } } else { return request; } } else { return request; } } /** * @param value * @return * @throws UnsupportedEncodingException */ private static String getValueEncoded(String value) throws UnsupportedEncodingException { if (value == null) { return "null"; } String newValue = value.replace("\n", ""); for (int i = 0, length = newValue.length(); i < length; i++) { char c = newValue.charAt(i); if (c <= '\u001f' || c >= '\u007f') { return URLEncoder.encode(newValue, "UTF-8"); } } return newValue; } /** * */ public static class Builder { RxCommonParamsInterceptor mHttpCommonInterceptor; public Builder() { mHttpCommonInterceptor = new RxCommonParamsInterceptor(); } public Builder(Type t) { mHttpCommonInterceptor = new RxCommonParamsInterceptor(t); } public Type getControlType() { if (null != mHttpCommonInterceptor) { return mHttpCommonInterceptor.getControlType(); } return Type.ADD; } public void setControlType(Type t) { if (null != mHttpCommonInterceptor) { mHttpCommonInterceptor.setControlType(t); } } public Builder addParams(String key, String value) { mHttpCommonInterceptor.mParamsMap.put(key, value); return this; } public Builder addParams(String key, int value) { return addParams(key, String.valueOf(value)); } public Builder addParams(String key, float value) { return addParams(key, String.valueOf(value)); } public Builder addParams(String key, long value) { return addParams(key, String.valueOf(value)); } public Builder addParams(String key, double value) { return addParams(key, String.valueOf(value)); } public Builder addParamsMap(Map<String, String> mParamsMap) { mHttpCommonInterceptor.mParamsMap.putAll(mParamsMap); return this; } public RxCommonParamsInterceptor build() { return mHttpCommonInterceptor; } } }
2.8.5.RxHttpLoggingInterceptor
public class RxHttpLoggingInterceptor implements Interceptor {
private int level = LEVEL.ALL; public void level(int level) { this.level = level; } private RxHttpLoggingInterceptor() {} @Override public Response intercept(Chain chain) throws IOException { switch (level) { case LEVEL.BODY: LogUtils.i("request tag = " + chain.request().tag()); LogUtils.i("request url = " + chain.request().url()); LogUtils.i("request body = " + chain.request().body()); break; case LEVEL.HEADERS: LogUtils.i("request tag = " + chain.request().tag()); LogUtils.i("request url = " + chain.request().url()); LogUtils.i("request headers = " + chain.request().body()); break; case LEVEL.ALL: LogUtils.i("request tag = " + chain.request().tag()); LogUtils.i("request url = " + chain.request().url()); LogUtils.i("request headers = " + chain.request().body()); LogUtils.i("request body = " + chain.request().body()); break; default: break; } Response response = chain.proceed(chain.request()); switch (level) { case LEVEL.BODY: LogUtils.i("request tag = " + chain.request().tag()); LogUtils.i("request url = " + chain.request().url()); LogUtils.i("response body = " + response.body()); break; default: break; } return response; } public static class Builder { RxHttpLoggingInterceptor interceptor = null; public Builder() { interceptor = new RxHttpLoggingInterceptor(); } public Builder level(int level) { interceptor.level(level); return this; } public RxHttpLoggingInterceptor build() { return this.interceptor; } } public static class LEVEL { public final static int BODY = 0; public final static int HEADERS = 1; public final static int ALL = 2; } }
2.8.6.RxRequestTagInterceptor
public class RxRequestTagInterceptor implements Interceptor {
private Object tag; public void tag(Object tag) { this.tag = tag; } @Override public Response intercept(Chain chain) throws IOException { if (null == tag) { return chain.proceed(chain.request()); } Request.Builder builder = chain.request().newBuilder().tag(tag); return chain.proceed(builder.build()); } public static class Builder { private RxRequestTagInterceptor interceptor = null; public Builder() { interceptor = new RxRequestTagInterceptor(); } public Builder tag(Object tag) { this.interceptor.tag(tag); return this; } public RxRequestTagInterceptor build() { return this.interceptor; } } }
2.8.7.RxProgressResponseInterceptor
public class RxProgressResponseInterceptor implements Interceptor {
private IRxCallback listener;
public RxProgressResponseInterceptor(IRxCallback listener) {
this.listener = listener;
}
@Override
public Response intercept(Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new RxProgressResponseBody(originalResponse.body(), listener))
.build();
}
}
三.OkHttp任务队列
OkHttp的一个高效之处在于在内部维护了一个线程池,方便高效地执行异步请求。
3.1.线程池的优点
OkHttp的任务队列在内部维护了一个线程池用于执行具体的网络请求。而线程池最大的好处在于通过线程复用减少非核心任务的损耗。
线程执行任务的时间消耗分为:
T1 创建线程的时间
T2 在线程中执行任务的时间,包括线程间同步所需时间
T3 线程销毁的时间
而T1,T3是多线程本身带来的消耗,线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。
1.通过对线程进行缓存,减少了创建销毁的时间损失
2.通过控制线程数量阀值,减少了当线程过少时带来的CPU闲置(比如说长时间卡在I/O上了)与线程过多时对JVM的内存与线程切换时系统调用的压力
类似的还有Socket连接池、DB连接池、CommonPool(比如Jedis)等技术。
3.2.Dispatcher管理任务队列
OkHttp的任务队列由Dispatcher统一管理:
readyAsyncCalls:待执行异步任务队列
runningAsyncCalls:运行中异步任务队列
runningSyncCalls:运行中同步任务队列
executorService:任务队列线程池:
Dispatcher规定,同时执行的最大请求数为64,同时规定同一Host同时执行的最大请求数为5
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
线程池:
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); }
•int corePoolSize: 最小并发线程数,这里并发同时包括空闲与活动的线程,如果是0的话,空闲一段时间后所有线程将全部被销毁
•int maximumPoolSize: 最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理
•long keepAliveTime: 当线程数大于 corePoolSize 时,多余的空闲线程的最大存活时间,类似于HTTP中的Keep-alive
•TimeUnit unit: 时间单位,一般用秒
•BlockingQueue workQueue: 工作队列,先进先出,可以看出并不像Picasso那样设置优先队列
•ThreadFactory threadFactory: 单个线程的工厂,可以打Log,设置 Daemon (即当JVM退出时,线程自动结束)等
可以看出,在Okhttp中,构建了一个阀值为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做"OkHttp Dispatcher"的线程工厂。
也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。
3.3.总结
(1)OkHttp采用Dispatcher技术,类似于Nginx,与线程池配合实现了高并发,低阻塞的运行
(2)Okhttp采用Deque作为缓存,按照入队的顺序先进先出
(3)OkHttp最出彩的地方就是在try/finally中调用了 finished 函数,可以主动控制等待队列的移动,而不是采用锁或者wait/notify,极大减少了编码复杂性
四.OkHttp缓存策略
合理地利用本地缓存可以有效地减少网络开销,减少响应延迟。HTTP报头也定义了很多与缓存有关的域来控制缓存。
4.1.HTTP缓存策略
4.1.1 Expires
超时时间,一般用在服务器的response报头中用于告知客户端对应资源的过期时间。当客户端需要再次请求相同资源时先比较其过期时间,如果尚未超过过期时间则直接返回缓存结果,如果已经超过则重新请求。
4.1.2 Cache-Control
Cache-Control 比 Expires 优先级更高:
CacheControl.FORCE_CACHE; //仅仅使用缓存
CacheControl.FORCE_NETWORK;// 仅仅使用网络
指定请求和响应遵循的缓存机制。在请求消息或响应消息中设置Cache-Control并不会修改另一个消息处理过程中的缓存处理过程。请求时的缓存指令有下几种:
•Public指示响应可被任何缓存区缓存。
•Private指示对于单个用户的整个或部分响应消息,不能被共享缓存处理。这允许服务器仅仅描述当用户的部分响应消息,此响应消息对于其他用户的请求无效。
•no-cache指示请求或响应消息不能缓存
•no-store用于防止重要的信息被无意的发布。在请求消息中发送将使得请求和响应消息都不使用缓存。
•max-age指示客户机可以接收生存期不大于指定时间(以秒为单位)的响应。
告知缓存多长时间,在没有超过缓存时间的情况下,请求会返回缓存内的数据,在超出max-age的情况下向服务端发起新的请求,请求失败的情况下返回缓存数据,否则向服务端重新发起请求。
•min-fresh指示客户机可以接收响应时间小于当前时间加上指定时间的响应。
•max-stale指示客户机可以接收超出超时期间的响应消息。如果指定max-stale消息的值,那么客户机可以接收超出超时期指定值之内的响应消息。可以理解为指示客户机可以接收超出max-age时间的响应消息,max-stale在请求设置中有效,在响应设置中无效。max-age和max-stale在请求中同时使用的情况下,缓存的时间可以为max-age和max-stale的和。
代码设置:
final CacheControl.Builder builder = new CacheControl.Builder();
builder.noCache();//不使用缓存,全部走网络
builder.noStore();//不使用缓存,也不存储缓存
builder.onlyIfCached();//只使用缓存
builder.noTransform();//禁止转码
builder.maxAge(10, TimeUnit.MILLISECONDS);//指示客户机可以接收生存期不大于指定时间的响应。
builder.maxStale(10, TimeUnit.SECONDS);//指示客户机可以接收超出超时期间的响应消息
builder.minFresh(10, TimeUnit.SECONDS);//指示客户机可以接收响应时间小于当前时间加上指定时间的响应。
CacheControl cache = builder.build();//cacheControl
4.1.3 条件GET请求
4.1.3.1 Last-Modified-Date
客户端第一次请求时,服务器返回:
Last-Modified: Tue, 12 Jan 2016 09:31:27 GMT
当客户端二次请求时,可以头部加上如下header:
If-Modified-Since: Tue, 12 Jan 2016 09:31:27 GMT
如果当前资源没有被二次修改,服务器返回304告知客户端直接复用本地缓存。
4.1.3.2 ETag
ETag是对资源文件的一种摘要,可以通过ETag值来判断文件是否有修改。当客户端第一次请求某资源时,服务器返回:
ETag: "5694c7ef-24dc"
客户端再次请求时,可在头部加上如下域:
If-None-Match: "5694c7ef-24dc"
如果文件并未改变,则服务器返回304告知客户端可以复用本地缓存。
4.1.4 no-cache/no-store
不使用缓存
4.1.5 only-if-cached
只使用缓存
4.2.Cache源码分析
4.2.1.Cache
Cache管理器类,内部包含一个DiskLruCache负责将cache内容写入文件,DiskLruCache是OkHttp内容缓存的实际执行者。
4.2.2.CacheStrategy.Factory
缓存策略工厂类根据当前请求(request)以及cache响应(response)返回对应的缓存策略。传入参数为当前request,cache响应response,经过一系列逻辑判断(例如:判断报头的Cache-Strategy 是否包含 “only-if-cached“ 参数。如果有的话,不会请求新的数据),返回相应的缓存策略。
处理逻辑主要在getCandidate()方法中执行:
/** Returns a strategy to use assuming the request can use the network. */
private CacheStrategy getCandidate() {
// (1)若本地没有缓存,发起网络请求
if (cacheResponse == null) {
return new CacheStrategy(request, null);
}
// (2)如果当前请求是HTTPS,而缓存没有TLS握手,重新发起网络请求
if (request.isHttps() && cacheResponse.handshake() == null) {
return new CacheStrategy(request, null);
}
//(3) If this response shouldn't have been stored, it should never be used
// as a response source. This check should be redundant as long as the
// persistence store is well-behaved and the rules are constant.
if (!isCacheable(cacheResponse, request)) {
return new CacheStrategy(request, null);
}
// (4)如果当前的缓存策略是不缓存或者是conditional get,发起网络请求
CacheControl requestCaching = request.cacheControl();
if (requestCaching.noCache() || hasConditions(request)) {
return new CacheStrategy(request, null);
}
// ageMillis:缓存age
long ageMillis = cacheResponseAge();
// freshMillis:缓存保鲜时间
long freshMillis = computeFreshnessLifetime();
if (requestCaching.maxAgeSeconds() != -1) {
freshMillis = Math.min(freshMillis,
SECONDS.toMillis(requestCaching.maxAgeSeconds()));
}
long minFreshMillis = 0;
if (requestCaching.minFreshSeconds() != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
}
long maxStaleMillis = 0;
CacheControl responseCaching = cacheResponse.cacheControl();
if (!responseCaching.mustRevalidate()
&& requestCaching.maxStaleSeconds() != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
}
// (6)如果 age + min-fresh >= max-age && age + min-fresh < max-age +
// max-stale,则虽然缓存过期了, //但是缓存继续可以使用,只是在头部添加 110 警告码
if (!responseCaching.noCache()
&& ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
Response.Builder builder = cacheResponse.newBuilder();
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning",
"110 HttpURLConnection \"Response is stale\"");
}
long oneDayMillis = 24 * 60 * 60 * 1000L;
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning",
"113 HttpURLConnection \"Heuristic expiration\"");
}
return new CacheStrategy(null, builder.build());
}
// (5)发起conditional get请求
String conditionName;
String conditionValue;
if (etag != null) {
conditionName = "If-None-Match";
conditionValue = etag;
} else if (lastModified != null) {
conditionName = "If-Modified-Since";
conditionValue = lastModifiedString;
} else if (servedDate != null) {
conditionName = "If-Modified-Since";
conditionValue = servedDateString;
} else {
return new CacheStrategy(request, null); // No condition! Make a
// regular request.
}
Headers.Builder conditionalRequestHeaders = request.headers()
.newBuilder();
Internal.instance.addLenient(conditionalRequestHeaders, conditionName,
conditionValue);
Request conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build()).build();
return new CacheStrategy(conditionalRequest, cacheResponse);
}
下面是需要检查的条件的汇总列表:
(1)判断缓存候选响应是否存在。
(2)如果接收的是 HTTPS 请求,如果需要的话,判断缓存候选响应是否已进行握手。
(3)判断缓存候选响应是否已缓存;这和 OkHttp 存储响应时完成的工作是相同的。
(4)如果没有缓存,在请求报头的 Cache-Control 中检查对应内容,如果该标记为 true,缓存候选响应将不会被使用,后面的检查也会跳过。
(5)在请求报头中查找 If-Modified-Since 或 If-None-Match,如果找到其中之一,缓存候选响应将不会被使用,后面的检查也会跳过。
(6)进行一些计算以得到诸如缓存响应缓存响应存活时间,缓存存活时间,缓存最大失活时间。完成检查最简单的办法是写这样的伪代码:if ("cache candidate's no-cache" && "cache candidate's age" + "request's min-fresh" < "cache candidate's fresh lifetime" + "request's max-stale")
如果上述条件被满足,那么已缓存的响应报文将会被使用,后面的检查将跳过。
4.2.3.CacheStrategy
包含缓存相关的所有逻辑,其内部维护一个经过策略比对(Factory中)得出的request和response,根据request和response的实际情况,来决定是通过网络还是缓存获取response,抑或二者同时使用。
4.2.4.CacheInterceptor
负责执行实际的缓存工作,所有工作的执行策略都是根据CacheStrategy做出的。
4.3.DiskLruCache(最近最少使用)
Cache内部通过DiskLruCache管理cache在文件系统层面的创建,读取,清理等等工作!
通过LinkedHashMap本身的实现逻辑达到cache的LRU替换;
使用Okio对File的封装,简化了I/O操作
DiskLruCache可以看成是Cache在文件系统层的具体实现,所以其基本操作接口存在一一对应的关系:
(1)Cache.get() —>DiskLruCache.get()
(2)Cache.put()—>DiskLruCache.edit() //cache插入
(3)Cache.remove()—>DiskLruCache.remove()
(4)Cache.update()—>DiskLruCache.edit()//cache更新
总结
总结起来DiskLruCache主要有以下几个特点:
•通过LinkedHashMap实现LRU替换
•通过本地维护Cache操作日志保证Cache原子性与可用性,同时为防止日志过分膨胀定时执行日志精简
•每一个Cache项对应两个状态副本:DIRTY,CLEAN。CLEAN表示当前可用状态Cache,外部访问到的cache快照均为CLEAN状态;DIRTY为更新态Cache。由于更新和创建都只操作DIRTY状态副本,实现了Cache的读写分离。
•每一个Cache项有四个文件,两个状态(DIRTY,CLEAN),每个状态对应两个文件:一个文件存储Cache meta数据,一个文件存储Cache内容数据
五.OkHttp连接池(ConnectionPool)
OkHttp的连接池管理,是OkHttp的核心部分。通过维护连接池,最大限度重用现有连接,减少网络连接的创建开销,以此提升网络请求效率。
5.1.背景
5.1.1 keep-alive机制
HTTP1.0中HTTP的请求流程如下:
这种方法的好处是简单,各个请求互不干扰。但在复杂的网络请求场景下这种方式几乎不可用。例如:浏览器加载一个HTML网页,HTML中可能需要加载数十个资源,典型场景下这些资源中大部分来自同一个站点。按照HTTP1.0的做法,这需要建立数十个TCP连接,每个连接负责一个资源请求。创建一个TCP连接需要3次握手,而释放连接则需要2次或4次握手。重复的创建和释放连接极大地影响了网络效率,同时也增加了系统开销。
为了有效地解决这一问题,HTTP/1.1提出了 Keep-Alive 机制:
当一个HTTP请求的数据传输结束后,TCP连接不立即释放,如果此时有新的HTTP请求,且其请求的Host同上次请求相同,则可以直接复用未释放的TCP连接,从而省去了TCP的释放和再次创建的开销,减少了网络延时:
在现代浏览器中,一般同时开启6~8个keepalive connections的socket连接,并保持一定的链路生命,当不需要时再关闭;而在服务器中,一般是由软件根据负载情况(比如FD最大值、Socket内存、超时时间、栈内存、栈数量等)决定是否主动关闭。
5.1.2 HTTP/2
在HTTP/1.x中,如果客户端想发起多个并行请求必须建立多个TCP连接,这无疑增大了网络开销。另外HTTP/1.x不会压缩请求和响应报头,导致了不必要的网络流量;HTTP/1.x不支持资源优先级导致底层TCP连接利用率低下。而这些问题都是HTTP/2要着力解决的。简单来说HTTP/2主要解决了以下问题:
(1)报头压缩:HTTP/2使用HPACK压缩格式压缩请求和响应报头数据,减少不必要流量开销
(2)请求与响应复用:HTTP/2通过引入新的二进制分帧层实现了完整的请求和响应复用,客户端和服务器可以将HTTP消息分解为互不依赖的帧,然后交错发送,最后再在另一端将其重新组装
(3)指定数据流优先级:将 HTTP 消息分解为很多独立的帧之后,我们就可以复用多个数据流中的帧,客户端和服务器交错发送和传输这些帧的顺序就成为关键的性能决定因素。为了做到这一点,HTTP/2 标准允许每个数据流都有一个关联的权重和依赖关系
(4)流控制:HTTP/2 提供了一组简单的构建块,这些构建块允许客户端和服务器实现其自己的数据流和连接级流控制
HTTP/2所有性能增强的核心在于新的二进制分帧层,它定义了如何封装HTTP消息并在客户端与服务器之间进行传输。同时HTTP/2引入了三个新的概念:
(1)数据流:基于TCP连接之上的逻辑双向字节流,对应一个请求及其响应。客户端每发起一个请求就建立一个数据流,后续该请求及其响应的所有数据都通过该数据流传输
(2)消息:一个请求或响应对应的一系列数据帧
(3)帧:HTTP/2的最小数据切片单位
上述概念之间的逻辑关系:
•所有通信都在一个 TCP 连接上完成,此连接可以承载任意数量的双向数据流
•每个数据流都有一个唯一的标识符和可选的优先级信息,用于承载双向消息
•每条消息都是一条逻辑 HTTP 消息(例如请求或响应),包含一个或多个帧
•帧是最小的通信单位,承载着特定类型的数据,例如 HTTP 标头、消息负载,等等。 来自不同数据流的帧可以交错发送,然后再根据每个帧头的数据流标识符重新组装
•每个HTTP消息被分解为多个独立的帧后可以交错发送,从而在宏观上实现了多个请求或响应并行传输的效果。这类似于多进程环境下的时间分片机制
5.2.连接池的使用与分析
无论是HTTP/1.1的 Keep-Alive 机制还是HTTP/2的多路复用机制,在实现上都需要引入连接池来维护网络连接。
OkHttp内部通过ConnectionPool来管理连接池,首先来看下ConnectionPool的主要成员:
public final class ConnectionPool {
private static final Executor executor = new ThreadPoolExecutor( 0 /* corePoolSize */, Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory( "OkHttp ConnectionPool", true)); /** The maximum number of idle connections for each address. */ private final int maxIdleConnections; private final long keepAliveDurationNs; private final Runnable cleanupRunnable = new Runnable() { @Override public void run() {// ......} }; private final Deque<RealConnection> connections = new ArrayDeque<>(); final RouteDatabase routeDatabase = new RouteDatabase(); boolean cleanupRunning; // ...... /** * 返回符合要求的可重用连接,如果没有返回NULL */ RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {// ...... } /* * 去除重复连接。主要针对多路复用场景下一个address只需要一个连接 */ Socket deduplicate(Address address, StreamAllocation streamAllocation) {// ...... } /* * 将连接加入连接池 */ void put(RealConnection connection) {// ......} /* * 当有连接空闲时唤起cleanup线程清洗连接池 */ boolean connectionBecameIdle(RealConnection connection) {// ......} /** * 扫描连接池,清除空闲连接 */ long cleanup(long now) {// ...... } /* * 标记泄露连接 */ private int pruneAndGetAllocationCount(RealConnection connection, long now) { // ......} }
相关概念:
• Call :对Http请求的封装
• Connection/RealConnection :物理连接的封装,其内部有 List<WeakReference<StreamAllocation>> 的引用计数
• StreamAllocation : okhttp中引入了StreamAllocation负责管理一个连接上的流,同时在connection中也通过一个StreamAllocation的引用的列表来管理一个连接的流,从而使得连接与流之间解耦。
• connections : Deque双端队列,用于维护连接的容器
• routeDatabase :用来记录连接失败的 Route 的黑名单,当连接失败的时候就会把失败的线路加进去
5.2.1 实例化
首先来看下ConnectionPool的实例化过程,一个OkHttpClient只包含一个ConnectionPool,其实例化过程也在OkHttpClient的实例化过程中实现,值得一提的是ConnectionPool各个方法的调用并没有直接对外暴露,而是通过OkHttpClient的Internal接口统一对外暴露:
Internal的唯一实现在OkHttpClient中,OkHttpClient通过这种方式暴露其API给外部类使用。
5.2.2 连接池维护
ConnectionPool内部通过一个双端队列(ArrayDeque(可变数组))来维护当前所有连接,主要涉及到的操作包括:
•put:放入新连接
•get:从连接池中获取连接
•evictAll:关闭所有连接
•connectionBecameIdle:连接变空闲后调用清理线程
•deduplicate:清除重复的多路复用线程
5.2.2.1 StreamAllocation.findConnection
get是ConnectionPool中最为重要的方法, StreamAllocation 在其findConnection方法内部通过调用get方法为其找到stream找到合适的连接,如果没有则新建一个连接。首先来看下 findConnection 的逻辑:
private RealConnection findConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled)
throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released)
throw new IllegalStateException("released");
if (codec != null)
throw new IllegalStateException("codec != null");
if (canceled)
throw new IOException("Canceled");
// 一个StreamAllocation刻画的是一个Call的数据流动,一个Call可能存在多次请求(重定向,Authenticate等),所以当发生类似重定向等事件时优先使用原有的连接
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null
&& !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// 试图从连接池中找到可复用的连接
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// 获取路由配置,所谓路由其实就是代理,ip地址等参数的一个组合
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled)
throw new IOException("Canceled");
// 拿到路由后可以尝试重新从连接池中获取连接,这里主要针对http2协议下清除域名碎片机制
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null)
return connection;
// 新建连接
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
// 修改result连接stream计数,方便connection标记清理
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// 将新建的连接放入到连接池中
Internal.instance.put(connectionPool, result);
// 如果同时存在多个连向同一个地址的多路复用连接,则关闭多余连接,只保留一个
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address,
this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
其主要逻辑大致分为以下几个步骤:
(1)查看当前streamAllocation是否有之前已经分配过的连接,有则直接使用
(2)从连接池中查找可复用的连接,有则返回该连接
(3)配置路由,配置后再次从连接池中查找是否有可复用连接,有则直接返回
(4)新建一个连接,并修改其StreamAllocation标记计数,将其放入连接池中
(5)查看连接池是否有重复的多路复用连接,有则清除
5.2.2.2 ConnectionPool.get
RealConnection get(Address address, StreamAllocation streamAllocation,
Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
其逻辑比较简单,遍历当前连接池,如果有符合条件的连接则修改器标记计数,然后返回。这里的关键逻辑在 RealConnection.isEligible 方法:
public boolean isEligible(Address address, Route route) {
// If this connection is not accepting new streams, we're done.
if (allocations.size() >= allocationLimit || noNewStreams)
return false;
// If the non-host fields of the address don't overlap, we're done.
if (!Internal.instance.equalsNonHost(this.route.address(), address))
return false;
// If the host exactly matches, we're done: this connection can carry
// the address.
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to
// carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
if (http2Connection == null)
return false;
// 2. The routes must share an IP address. This requires us to have a
// DNS address for both
// hosts, which only happens after route planning. We can't coalesce
// connections that use a
// proxy, since proxies don't tell us the origin server's IP address.
if (route == null)
return false;
if (route.proxy().type() != Proxy.Type.DIRECT)
return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT)
return false;
if (!this.route.socketAddress().equals(route.socketAddress()))
return false;
// 3. This connection's server certificate's must cover the new host.
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE)
return false;
if (!supportsUrl(address.url()))
return false;
// 4. Certificate pinning must match the host.
try {
address.certificatePinner().check(address.url().host(),
handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
•连接没有达到共享上限
•非host域必须完全一样
•如果此时host域也相同,则符合条件,可以被复用
•如果host不相同,在HTTP/2的域名切片场景下一样可以复用
5.2.2.3 ConnectionPool.deduplicate
deduplicate方法主要是针对在HTTP/2场景下多个多路复用连接清除的场景。如果当前连接是HTTP/2,那么所有指向该站点的请求都应该基于同一个TCP连接:
Socket deduplicate(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, null)
&& connection.isMultiplexed()
&& connection != streamAllocation.connection()) {
return streamAllocation.releaseAndAcquire(connection);
}
}
return null;
}
5.2.3 自动回收
连接池中有socket回收,而这个回收是以 RealConnection 的弱引用 List<Reference<StreamAllocation>> 是否为0来为依据的。ConnectionPool有一个独立的线程 cleanupRunnable 来清理连接池,其触发时机有两个:
•当连接池中put新的连接时
•当connectionBecameIdle接口被调用时
while (true) {
// 执行清理并返回下场需要清理的时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1)
return;
if (waitNanos > 0) {
synchronized (ConnectionPool.this) {
try {
// 在timeout内释放锁与时间片
ConnectionPool.this.wait(TimeUnit.NANOSECONDS
.toMillis(waitNanos));
} catch (InterruptedException ignored) {
}
}
}
}
这段死循环实际上是一个阻塞的清理任务,首先进行清理(clean),并返回下次需要清理的间隔时间,然后调用 wait(timeout) 进行等待以释放锁与时间片,当等待时间到了后,再次进行清理,并返回下次要清理的间隔时间……
cleanup的基本逻辑如下:
•遍历连接池中所有连接,标记泄露连接
•如果被标记的连接满足( 空闲socket连接超过5个 && keepalive时间大于5分钟 ),就将此连接从 Deque 中移除,并关闭连接,返回 0 ,也就是将要执行 wait(0) ,提醒立刻再次扫描
•如果( 目前还可以塞得下5个连接,但是有可能泄漏的连接(即空闲时间即将达到5分钟) ),就返回此连接即将到期的剩余时间,供下次清理
•如果( 全部都是活跃的连接 ),就返回默认的 keep-alive 时间,也就是5分钟后再执行清理
pruneAndGetAllocationCount方法 负责标记并找到不活跃连接:
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
// 虚引用列表
List<Reference<StreamAllocation>> references = connection.allocations;
// 遍历弱引用列表
for (int i = 0; i < references.size();) {
Reference<StreamAllocation> reference = references.get(i);
// 如果正在被使用,跳过,接着循环
// 是否置空是在上文`connectionBecameIdle`的`release`控制的
if (reference.get() != null) {
// 非常明显的引用计数
i++;
continue;
}
// 否则移除引用
references.remove(i);
connection.noNewStreams = true;
// 如果所有分配的流均没了,标记为已经距离现在空闲了5分钟
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
OkHttp的连接池通过计数+标记清理的机制来管理连接池,使得无用连接可以被会回收,并保持多个健康的keep-alive连接。这也是OkHttp的连接池能保持高效的关键原因。
Google貌似在6.0版本里面删除了HttpClient相关API,为了更好的在应对网络访问,学习下okhttp。
OkHttp 处理了很多网络疑难杂症:会从很多常用的连接问题中自动恢复。如果您的服务器配置了多个IP地址,当第一个IP连接失败的时候,OkHttp会自动尝试下一个IP。OkHttp还处理了代理服务器问题和SSL握手失败问题。
基本功能:
- 一般的get请求
- 一般的post请求
- 基于Http的文件上传
- 文件下载
- 加载图片
- 支持请求回调,直接返回对象、对象集合
- 支持session的保持