基于OkHttp3.8
OkHttp同步请求:request和response默认在一个线程,需要new Thread 执行网络请求,并切换到主线程更新UI
val client = OkHttpClient() val request = Request.Builder() .url("") .addHeader("header", "zz") .build() client.newCall(request).execute()
OkHttp异步请求:respnse默认在子线程,需要切换到主线程更新UI
client.newCall(request).enqueue(object : Callback{ override fun onFailure(call: okhttp3.Call?, e: IOException?) { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } override fun onResponse(call: okhttp3.Call?, response: Response?) { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } })
一个Http请求分三步走:
1、创建OkHttpClient对象
2、创建Request对象
3、通过client 和 request 得到一个RealCall对象,然后采用同步或者异步的方式请求得到response
一、创建OkHttpClient对象
OkHttpClient() 使用默认的Builder()来实例化。简化了client的创建过程
public OkHttpClient() { this(new Builder()); } OkHttpClient(Builder builder) { this.dispatcher = builder.dispatcher; this.proxy = builder.proxy; this.protocols = builder.protocols; this.connectionSpecs = builder.connectionSpecs; this.interceptors = Util.immutableList(builder.interceptors); this.networkInterceptors = Util.immutableList(builder.networkInterceptors); this.eventListenerFactory = builder.eventListenerFactory; this.proxySelector = builder.proxySelector; this.cookieJar = builder.cookieJar; this.cache = builder.cache; this.internalCache = builder.internalCache; this.socketFactory = builder.socketFactory; boolean isTLS = false; for (ConnectionSpec spec : connectionSpecs) { isTLS = isTLS || spec.isTls(); } if (builder.sslSocketFactory != null || !isTLS) { this.sslSocketFactory = builder.sslSocketFactory; this.certificateChainCleaner = builder.certificateChainCleaner; } else { X509TrustManager trustManager = systemDefaultTrustManager(); this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager); this.certificateChainCleaner = CertificateChainCleaner.get(trustManager); } this.hostnameVerifier = builder.hostnameVerifier; this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner( certificateChainCleaner); this.proxyAuthenticator = builder.proxyAuthenticator; this.authenticator = builder.authenticator; this.connectionPool = builder.connectionPool; this.dns = builder.dns; this.followSslRedirects = builder.followSslRedirects; this.followRedirects = builder.followRedirects; this.retryOnConnectionFailure = builder.retryOnConnectionFailure; this.connectTimeout = builder.connectTimeout; this.readTimeout = builder.readTimeout; this.writeTimeout = builder.writeTimeout; this.pingInterval = builder.pingInterval; }
Builder是 OkHttpClient里面的一个静态内部类,采用构造者模式。
public Builder() { dispatcher = new Dispatcher(); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; eventListenerFactory = EventListener.factory(EventListener.NONE); proxySelector = ProxySelector.getDefault(); cookieJar = CookieJar.NO_COOKIES; socketFactory = SocketFactory.getDefault(); hostnameVerifier = OkHostnameVerifier.INSTANCE; certificatePinner = CertificatePinner.DEFAULT; proxyAuthenticator = Authenticator.NONE; authenticator = Authenticator.NONE; connectionPool = new ConnectionPool(); dns = Dns.SYSTEM; followSslRedirects = true; followRedirects = true; retryOnConnectionFailure = true; connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; pingInterval = 0; }
一、创建Request对象
Builder是 Request里面的一个静态内部类,采用构造者模式。
.Builder() 默认为 GET 请求,
.url("") 将 string 的 url 转换成 HttpUrl, 能够被服务器响应的url
.header("header", "zz") 添加请求的表头
public static class Builder { HttpUrl url; String method; Headers.Builder headers; RequestBody body; Object tag; public Builder() { this.method = "GET"; this.headers = new Headers.Builder(); } Builder(Request request) { this.url = request.url; this.method = request.method; this.body = request.body; this.tag = request.tag; this.headers = request.headers.newBuilder(); } public Builder url(HttpUrl url) { if (url == null) throw new NullPointerException("url == null"); this.url = url; return this; } /** * Sets the URL target of this request. * * @throws IllegalArgumentException if {@code url} is not a valid HTTP or HTTPS URL. Avoid this * exception by calling {@link HttpUrl#parse}; it returns null for invalid URLs. */ public Builder url(String url) { if (url == null) throw new NullPointerException("url == null"); // Silently replace web socket URLs with HTTP URLs. if (url.regionMatches(true, 0, "ws:", 0, 3)) { url = "http:" + url.substring(3); } else if (url.regionMatches(true, 0, "wss:", 0, 4)) { url = "https:" + url.substring(4); } HttpUrl parsed = HttpUrl.parse(url); if (parsed == null) throw new IllegalArgumentException("unexpected url: " + url); return url(parsed); } /** * Sets the URL target of this request. * * @throws IllegalArgumentException if the scheme of {@code url} is not {@code http} or {@code * https}. */ public Builder url(URL url) { if (url == null) throw new NullPointerException("url == null"); HttpUrl parsed = HttpUrl.get(url); if (parsed == null) throw new IllegalArgumentException("unexpected url: " + url); return url(parsed); } /** * Sets the header named {@code name} to {@code value}. If this request already has any headers * with that name, they are all replaced. */ public Builder header(String name, String value) { headers.set(name, value); return this; } /** * Adds a header with {@code name} and {@code value}. Prefer this method for multiply-valued * headers like "Cookie". * * <p>Note that for some headers including {@code Content-Length} and {@code Content-Encoding}, * OkHttp may replace {@code value} with a header derived from the request body. */ public Builder addHeader(String name, String value) { headers.add(name, value); return this; } public Builder removeHeader(String name) { headers.removeAll(name); return this; } /** Removes all headers on this builder and adds {@code headers}. */ public Builder headers(Headers headers) { this.headers = headers.newBuilder(); return this; } /** * Sets this request's {@code Cache-Control} header, replacing any cache control headers already * present. If {@code cacheControl} doesn't define any directives, this clears this request's * cache-control headers. */ public Builder cacheControl(CacheControl cacheControl) { String value = cacheControl.toString(); if (value.isEmpty()) return removeHeader("Cache-Control"); return header("Cache-Control", value); } public Builder get() { return method("GET", null); } public Builder head() { return method("HEAD", null); } public Builder post(RequestBody body) { return method("POST", body); } public Builder delete(@Nullable RequestBody body) { return method("DELETE", body); } public Builder delete() { return delete(Util.EMPTY_REQUEST); } public Builder put(RequestBody body) { return method("PUT", body); } public Builder patch(RequestBody body) { return method("PATCH", body); } public Builder method(String method, @Nullable RequestBody body) { if (method == null) throw new NullPointerException("method == null"); if (method.length() == 0) throw new IllegalArgumentException("method.length() == 0"); if (body != null && !HttpMethod.permitsRequestBody(method)) { throw new IllegalArgumentException("method " + method + " must not have a request body."); } if (body == null && HttpMethod.requiresRequestBody(method)) { throw new IllegalArgumentException("method " + method + " must have a request body."); } this.method = method; this.body = body; return this; } /** * Attaches {@code tag} to the request. It can be used later to cancel the request. If the tag * is unspecified or null, the request is canceled by using the request itself as the tag. */ public Builder tag(Object tag) { this.tag = tag; return this; } public Request build() { if (url == null) throw new IllegalStateException("url == null"); return new Request(this); } }
三、通过client 和 request 得到一个RealCall对象,然后采用同步或者异步的方式请求得到response
client.newCall(request).execute() client.newCall(request).enqueue(object : Callback{ override fun onFailure(call: okhttp3.Call?, e: IOException?) { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } override fun onResponse(call: okhttp3.Call?, response: Response?) { TODO("not implemented") //To change body of created functions use File | Settings | File Templates. } })
OkHttpClient实现了Call.Factory接口,即实现了newCall()
interface Factory { Call newCall(Request request); }
OkHttpClient#newCall() 得到一个RealCall对象
@Override public Call newCall(Request request) { return new RealCall(this, request, false /* for web socket */); }
RealCall 实现了Call接口
final class RealCall implements Call { final OkHttpClient client; final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor; final EventListener eventListener; /** The application's original request unadulterated by redirects or auth headers. */ final Request originalRequest; final boolean forWebSocket; // Guarded by this. private boolean executed; RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { final EventListener.Factory eventListenerFactory = client.eventListenerFactory(); this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket); // TODO(jwilson): this is unsafe publication and not threadsafe. this.eventListener = eventListenerFactory.create(this); } }
OkHttpClient#newCall() 得到一个RealCall对象,可执行同步方法 execute()和异步方法 enqueue(),同步和异步是通过dispatcher来实现线程调度的。真正的实现是在getResponseWithInterceptorChain()
同步请求:阻塞式的
每个call只能被执行一次、重复执行会抛异常
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }
异步请求:
@Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); client.dispatcher().enqueue(new AsyncCall(responseCallback)); }
AsyncCall 是 RealCall 一个内部类,继承自NamedRunnable,也就是异步请求的最终实现是在AsyncCall的execute()里面执行。
final class AsyncCall extends NamedRunnable { private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } @Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } }
Dispatcher 调度
同步请求的时候,client.dispatcher().executed(this) 和 client.dispatcher().finished(this) ,只是实现Dispatcher类的一个双向队列runningSyncCalls 入队和出队 (开始执行,和结束执行)。
异步请求的时候,client.dispatcher().enqueue(new AsyncCall(responseCallback)) 里面维护两个双向队列,一个用于正在执行的任务runningAsyncCalls,一个用于缓存等待的任务readyAsyncCalls。
当正在执行的总任务数小于 maxRequests 且相同host下执行的任务数小于 maxRequestsPerHost,则直接添加到runningAsyncCalls 里面,执行 AsyncCall 里的 execute() 方法;否则添加进等待任务队列,那么 readyAsyncCalls 中的任务是什么时候移到 runningAsyncCalls 中的呢?
异步任务 AsyncCall 的 execute() 方法最终会走到finally , client.dispatcher().finished(this),执行dispatch.promoteCalls() 经过一系列判断后会将等待中的任务列表添加到运行中的任务列表,并执行任务的 execute()方法。
public final class Dispatcher { private int maxRequests = 64; private int maxRequestsPerHost = 5; private @Nullable Runnable idleCallback; /** Executes calls. Created lazily. */ private @Nullable ExecutorService executorService; /** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>(); public Dispatcher(ExecutorService executorService) { this.executorService = executorService; } public Dispatcher() { } public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } } /** * Cancel all calls currently enqueued or executing. Includes calls executed both {@linkplain * Call#execute() synchronously} and {@linkplain Call#enqueue asynchronously}. */ public synchronized void cancelAll() { for (AsyncCall call : readyAsyncCalls) { call.get().cancel(); } for (AsyncCall call : runningAsyncCalls) { call.get().cancel(); } for (RealCall call : runningSyncCalls) { call.cancel(); } } private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } } /** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); } /** Used by {@code AsyncCall#run} to signal completion. */ void finished(AsyncCall call) { finished(runningAsyncCalls, call, true); } /** Used by {@code Call#execute} to signal completion. */ void finished(RealCall call) { finished(runningSyncCalls, call, false); } private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } } }
无论是同步请求还是异步请求,最终都是通过getResponseWithInterceptorChain() 得到响应response()
RealCall#getResponseWithInterceptorChain()
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. List<Interceptor> interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor); interceptors.add(new BridgeInterceptor(client.cookieJar())); interceptors.add(new CacheInterceptor(client.internalCache())); interceptors.add(new ConnectInterceptor(client)); if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket)); Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }
public interface Interceptor { Response intercept(Chain chain) throws IOException; interface Chain { Request request(); Response proceed(Request request) throws IOException; /** * Returns the connection the request will be executed on. This is only available in the chains * of network interceptors; for application interceptors this is always null. */ @Nullable Connection connection(); } }
RealInterceptorChain实现了Interceptor.Chain,实现了chain.Proceed()
1、getResponseWithInterceptors中得到List<Interceptor>,并创建RealInterceptorChain, 执行chain.proceed()得到response
2、在chain.proceed() 中创建下一个拦截器的chain, 执行当前interceptor.intercept(chain) ,即交给下一个拦截器进行处理。
3、intercept(chain) 对请求进行处理后,执行chain.proceed(), 循环步骤2
4、当遍历到callServerInterceptor时,callServerInterceptor.intercept(chain) 会直接返回response
5、response 逐层返回给上一个拦截器进行处理,递归结束后,getResponseWithInterceptors() 得到层层处理后的response。
public final class RealInterceptorChain implements Interceptor.Chain { private final List<Interceptor> interceptors; private final StreamAllocation streamAllocation; private final HttpCodec httpCodec; private final RealConnection connection; private final int index; private final Request request; private int calls; public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection, int index, Request request) { this.interceptors = interceptors; this.connection = connection; this.streamAllocation = streamAllocation; this.httpCodec = httpCodec; this.index = index; this.request = request; } @Override public Response proceed(Request request) throws IOException { return proceed(request, streamAllocation, httpCodec, connection); } public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException { if (index >= interceptors.size()) throw new AssertionError(); calls++; // If we already have a stream, confirm that the incoming request will use it. if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must retain the same host and port"); } // If we already have a stream, confirm that this is the only call to chain.proceed(). if (this.httpCodec != null && calls > 1) { throw new IllegalStateException("network interceptor " + interceptors.get(index - 1) + " must call proceed() exactly once"); } // Call the next interceptor in the chain. RealInterceptorChain next = new RealInterceptorChain( interceptors, streamAllocation, httpCodec, connection, index + 1, request); Interceptor interceptor = interceptors.get(index); Response response = interceptor.intercept(next); // Confirm that the next interceptor made its required call to chain.proceed(). if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) { throw new IllegalStateException("network interceptor " + interceptor + " must call proceed() exactly once"); } // Confirm that the intercepted response isn't null. if (response == null) { throw new NullPointerException("interceptor " + interceptor + " returned null"); } return response; } }
client.interceptors: 用户自定义的interceptor,最早完成对request的处理, 最后完成对response的处理
RetryAndFollowUpInterceptor:负责失败重试和重定向
BridgeInterceptor:负责把用户请求变成服务器能处理的请求,把服务器返回的响应变为用户可处理的响应,header/cookie/gzip
CacheInterceptor: 负责读取缓存、验证缓存有效性、更新缓存
ConnectInterceptor: 负责和服务器建立连接
client.networkInterceptors: 用户自定义的interceptor, 已经和服务器建立好连接
CallServerInterceptor: 负责向服务器发送请求数据,从服务器获取响应数据 HttpCodeC
StreamAllocation: 创建并维护客户端到服务器端的连接
public final class RetryAndFollowUpInterceptor implements Interceptor { /** * How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox, * curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5. */ private static final int MAX_FOLLOW_UPS = 20; private final OkHttpClient client; private final boolean forWebSocket; private StreamAllocation streamAllocation; private Object callStackTrace; private volatile boolean canceled; public RetryAndFollowUpInterceptor(OkHttpClient client, boolean forWebSocket) { this.client = client; this.forWebSocket = forWebSocket; } @Override public Response intercept(Chain chain) throws IOException { Request request = chain.request(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(request.url()), callStackTrace); int followUpCount = 0; Response priorResponse = null; while (true) { if (canceled) { streamAllocation.release(); throw new IOException("Canceled"); } Response response = null; boolean releaseConnection = true; try { response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); releaseConnection = false; } catch (RouteException e) { // The attempt to connect via a route failed. The request will not have been sent. if (!recover(e.getLastConnectException(), false, request)) { throw e.getLastConnectException(); } releaseConnection = false; continue; } catch (IOException e) { // An attempt to communicate with a server failed. The request may have been sent. boolean requestSendStarted = !(e instanceof ConnectionShutdownException); if (!recover(e, requestSendStarted, request)) throw e; releaseConnection = false; continue; } finally { // We're throwing an unchecked exception. Release any resources. if (releaseConnection) { streamAllocation.streamFailed(null); streamAllocation.release(); } } // Attach the prior response if it exists. Such responses never have a body. if (priorResponse != null) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null) .build()) .build(); } Request followUp = followUpRequest(response); if (followUp == null) { if (!forWebSocket) { streamAllocation.release(); } return response; } closeQuietly(response.body()); if (++followUpCount > MAX_FOLLOW_UPS) { streamAllocation.release(); throw new ProtocolException("Too many follow-up requests: " + followUpCount); } if (followUp.body() instanceof UnrepeatableRequestBody) { streamAllocation.release(); throw new HttpRetryException("Cannot retry streamed HTTP body", response.code()); } if (!sameConnection(response, followUp.url())) { streamAllocation.release(); streamAllocation = new StreamAllocation( client.connectionPool(), createAddress(followUp.url()), callStackTrace); } else if (streamAllocation.codec() != null) { throw new IllegalStateException("Closing the body of " + response + " didn't close its backing stream. Bad interceptor?"); } request = followUp; priorResponse = response; } } }