OkHttp源码解析

一、OkHttp使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//创建Client
OkHttpClient client = new OkHttpClient();
Request.Builder requestBuilder = new Request.Builder().url("http://www.baidu.com").method("GET",null);
Request request = requestBuilder.build();
//创建Call
Call call = client.newCall(request);
//异步请求
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//do samething
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//do samething
}
});
//同步请求
try {
Response r= call.execute();
if(r.isSuccessful()){
//do samething
}
} catch (IOException e) {
e.printStackTrace();
}

二、调用流程

此处输入图片的描述

1、创建OkHttpClient

这里通过Builder模式创建Client

1
2
3
public Builder newBuilder() {
return new Builder(this);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
Builder(OkHttpClient okHttpClient) {
this.dispatcher = okHttpClient.dispatcher;
this.proxy = okHttpClient.proxy;
this.protocols = okHttpClient.protocols;
this.connectionSpecs = okHttpClient.connectionSpecs;
this.interceptors.addAll(okHttpClient.interceptors);
this.networkInterceptors.addAll(okHttpClient.networkInterceptors);
this.proxySelector = okHttpClient.proxySelector;
this.cookieJar = okHttpClient.cookieJar;
this.internalCache = okHttpClient.internalCache;
this.cache = okHttpClient.cache;
this.socketFactory = okHttpClient.socketFactory;
this.sslSocketFactory = okHttpClient.sslSocketFactory;
this.certificateChainCleaner = okHttpClient.certificateChainCleaner;
this.hostnameVerifier = okHttpClient.hostnameVerifier;
this.certificatePinner = okHttpClient.certificatePinner;
this.proxyAuthenticator = okHttpClient.proxyAuthenticator;
this.authenticator = okHttpClient.authenticator;
this.connectionPool = okHttpClient.connectionPool;
this.dns = okHttpClient.dns;
this.followSslRedirects = okHttpClient.followSslRedirects;
this.followRedirects = okHttpClient.followRedirects;
this.retryOnConnectionFailure = okHttpClient.retryOnConnectionFailure;
this.connectTimeout = okHttpClient.connectTimeout;
this.readTimeout = okHttpClient.readTimeout;
this.writeTimeout = okHttpClient.writeTimeout;
this.pingInterval = okHttpClient.pingInterval;
}

2、创建Call

从使用可以看出:Call的创建是通过OkHttpClient中的newCall方法。

1
2
3
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}

Call是一个接口,这里的实现类是RealCall:

1
2
3
4
5
6
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}

3、同步或者异步请求

  • 同步请求
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @Override
    public Response execute() throws IOException {
    synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
    }
    captureCallStackTrace();
    try {
    client.dispatcher().executed(this);
    Response result = getResponseWithInterceptorChain();
    if (result == null) throw new IOException("Canceled");
    return result;
    } finally {
    client.dispatcher().finished(this);
    }
    }

同步请求,很直接就调用到了最核心的函数getResponseWithInterceptorChain()。再看下异步请求。

  • 异步请求
    1
    2
    3
    4
    5
    6
    7
    8
    @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }

异步请求,将用户传入responseCallback对象封装成一个AsyncCall对象提交给Dispather来处理,这里的AsyncCall是RealCall的一个内部类。再看下这个Dispather怎么处理这个AsyncCall的。

1
2
3
4
5
6
7
8
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

Dispather中管理了一些请求队列,如果运行中异步请求队列未满则加入该队列,并提交到线程池。否则,加入等待队列。
这里的AsyncCall其实就是Runnable的子类,所以直接能把AsyncCall的对象给了线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
//RealCall中内部类
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
//NamedRunnable.java
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}

AsyncCall父类的run()方法会调用抽象方法execute(),也就是将在Dispather里的线程池执行AsyncCall对象的时候,就会执行到execute(),在这个方法里同样调用了核心的网络请求方法getResponseWithInterceptorChain()。
而且在execute()里会回调用户接口responseCallback的回调方法。注意:这里的回调是在非主线程直接回调的,也就是在Android里使用的话要注意这里面不能直接更新UI操作。
所以,同步请求和异步请求最终都是调用的getResponseWithInterceptorChain();来发送网络请求,只是异步请求涉及到一些线程池操作,包括请求的队列管理、调度。

4、调用getResponseWithInterceptorChain()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//RealCall.java
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}

这个方法中是添加了一些拦截器,然后启动一个拦截器调用链,拦截器递归调用之后最后返回请求的响应Response。这里的拦截器分层的思想就是借鉴的网络里的分层模型的思想。请求从最上面一层到最下一层,响应从最下一层到最上一层,每一层只负责自己的任务,对请求或响应做自己负责的那块的修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//RealInterceptorChain.java
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
Connection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !sameConnection(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
return response;
}

三、拦截器

1、分层结构

此处输入图片的描述

RealInterceptorChain的proceed(),每次重新创建一个RealInterceptorChain对象,然后调用下一层的拦截器的interceptor.intercept()方法。
每一个拦截器的intercept()方法都是这样的结构:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
// 1、该拦截器在Request阶段负责的事情
// 2、调用RealInterceptorChain.proceed(),其实是递归调用下一层拦截器的intercept方法
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
//3、该拦截器在Response阶段负责的事情,然后返回到上一层拦截器的 response阶段
return response;
}
}

OkHttp中最底层为CallServerInterceptor,OkHttp还支持自定义拦截器。

2、几个拦截器

  • BridgeInterceptor
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();
    //Request阶段
    RequestBody body = userRequest.body();
    if (body != null) {
    MediaType contentType = body.contentType();
    if (contentType != null) {
    requestBuilder.header("Content-Type", contentType.toString());
    }
    long contentLength = body.contentLength();
    if (contentLength != -1) {
    requestBuilder.header("Content-Length", Long.toString(contentLength));
    requestBuilder.removeHeader("Transfer-Encoding");
    } else {
    requestBuilder.header("Transfer-Encoding", "chunked");
    requestBuilder.removeHeader("Content-Length");
    }
    }
    if (userRequest.header("Host") == null) {
    requestBuilder.header("Host", hostHeader(userRequest.url(), false));
    }
    if (userRequest.header("Connection") == null) {
    requestBuilder.header("Connection", "Keep-Alive");
    }
    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
    transparentGzip = true;
    requestBuilder.header("Accept-Encoding", "gzip");
    }
    List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
    if (!cookies.isEmpty()) {
    requestBuilder.header("Cookie", cookieHeader(cookies));
    }
    if (userRequest.header("User-Agent") == null) {
    requestBuilder.header("User-Agent", Version.userAgent());
    }
    Response networkResponse = chain.proceed(requestBuilder.build());
    //Response阶段
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    Response.Builder responseBuilder = networkResponse.newBuilder()
    .request(userRequest);
    if (transparentGzip
    && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
    && HttpHeaders.hasBody(networkResponse)) {
    GzipSource responseBody = new GzipSource(networkResponse.body().source());
    Headers strippedHeaders = networkResponse.headers().newBuilder()
    .removeAll("Content-Encoding")
    .removeAll("Content-Length")
    .build();
    responseBuilder.headers(strippedHeaders);
    responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }
    return responseBuilder.build();
    }

BridgeInterceptor拦截器再Request阶段,就是配置相关信息,重新build Request对象,添加请求头。在Response阶段做gzip解压。

  • CacheInterceptor
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    @Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
    ? cache.get(chain.request())
    : null;
    long now = System.currentTimeMillis();
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
    if (cache != null) {
    cache.trackResponse(strategy);
    }
    if (cacheCandidate != null && cacheResponse == null) {
    closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }
    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
    return new Response.Builder()
    .request(chain.request())
    .protocol(Protocol.HTTP_1_1)
    .code(504)
    .message("Unsatisfiable Request (only-if-cached)")
    .body(Util.EMPTY_RESPONSE)
    .sentRequestAtMillis(-1L)
    .receivedResponseAtMillis(System.currentTimeMillis())
    .build();
    }
    // If we don't need the network, we're done.
    if (networkRequest == null) {
    return cacheResponse.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .build();
    }
    Response networkResponse = null;
    try {
    networkResponse = chain.proceed(networkRequest);
    } finally {
    // If we're crashing on I/O or otherwise, don't leak the cache body.
    if (networkResponse == null && cacheCandidate != null) {
    closeQuietly(cacheCandidate.body());
    }
    }
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
    if (networkResponse.code() == HTTP_NOT_MODIFIED) {
    Response response = cacheResponse.newBuilder()
    .headers(combine(cacheResponse.headers(), networkResponse.headers()))
    .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
    .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build();
    networkResponse.body().close();
    // Update the cache after combining headers but before stripping the
    // Content-Encoding header (as performed by initContentStream()).
    cache.trackConditionalCacheHit();
    cache.update(cacheResponse, response);
    return response;
    } else {
    closeQuietly(cacheResponse.body());
    }
    }
    Response response = networkResponse.newBuilder()
    .cacheResponse(stripBody(cacheResponse))
    .networkResponse(stripBody(networkResponse))
    .build();
    if (HttpHeaders.hasBody(response)) {
    CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
    response = cacheWritingResponse(cacheRequest, response);
    }
    return response;
    }

CacheInterceptor拦截器在Request中检查是否该请求有缓存,是否要重新请求,如果不需要,则使用缓存,不调用下一层。Response阶段则对下一层的Response做缓存。

  • ConnectInterceptor

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
    }

    ConnectInterceptor拦截器只在Request阶段建立连接,Response阶段直接把下一层的Response返回给上一层。再看下建立连接的过程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    //StreamAllocation.java
    public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    int connectTimeout = client.connectTimeoutMillis();
    int readTimeout = client.readTimeoutMillis();
    int writeTimeout = client.writeTimeoutMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    //查找健康网络连接
    try {
    RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
    writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
    HttpCodec resultCodec = resultConnection.newCodec(client, this);
    synchronized (connectionPool) {
    codec = resultCodec;
    return resultCodec;
    }
    } catch (IOException e) {
    throw new RouteException(e);
    }
    }
    private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
    int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
    throws IOException {
    while (true) {
    RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
    connectionRetryEnabled);
    // If this is a brand new connection, we can skip the extensive health checks.
    synchronized (connectionPool) {
    if (candidate.successCount == 0) {
    return candidate;
    }
    }
    // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
    // isn't, take it out of the pool and start again.
    if (!candidate.isHealthy(doExtensiveHealthChecks)) {
    noNewStreams();
    continue;
    }
    return candidate;
    }
    }
    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
    boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    synchronized (connectionPool) {
    if (released) throw new IllegalStateException("released");
    if (codec != null) throw new IllegalStateException("codec != null");
    if (canceled) throw new IOException("Canceled");
    // Attempt to use an already-allocated connection.
    RealConnection allocatedConnection = this.connection;
    if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
    return allocatedConnection;
    }
    // Attempt to get a connection from the pool.
    Internal.instance.get(connectionPool, address, this);
    if (connection != null) {
    return connection;
    }
    selectedRoute = route;
    }
    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
    selectedRoute = routeSelector.next();
    }
    // Create a connection and assign it to this allocation immediately. This makes it possible for
    // an asynchronous cancel() to interrupt the handshake we're about to do.
    RealConnection result;
    synchronized (connectionPool) {
    route = selectedRoute;
    refusedStreamCount = 0;
    result = new RealConnection(connectionPool, selectedRoute);
    acquire(result);
    if (canceled) throw new IOException("Canceled");
    }
    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    routeDatabase().connected(result.route());
    Socket socket = null;
    synchronized (connectionPool) {
    // Pool the connection.
    Internal.instance.put(connectionPool, result);
    // If another multiplexed connection to the same address was created concurrently, then
    // release this connection and acquire that one.
    if (result.isMultiplexed()) {
    socket = Internal.instance.deduplicate(connectionPool, address, this);
    result = connection;
    }
    }
    closeQuietly(socket);
    return result;
    }
    1
    2
    3
    4
    //ConnectionPool.java
    public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
    }

    这里基本就是从连接池里去找已有的网络连接,如果有,则复用,减少三次握手;没有的话,则创建一个RealConnection对象,三次握手,建立连接,然后将连接放到连接池。具体的内部connect过程,就不深入了。ConnecctonPool中最多支持保持5个地址的连接keep-alive,每个keep-alive 5分钟,并有异步线程循环清理无效的连接。