文章

OkHttp原理

OkHttp原理

OkHttp3 原理

OkHttp 中的 API

1、OkHttpClient

设置 OkHttp 中通用的参数,是一个 Builder 模式,外观模式(门面模式,Facade)

2、Request

一个 HTTP 请求,包含 HttpUrl、method、headers、body 以及一个请求 tag。

3、Call

Call 是一个准备要执行的 request,可以被取消。代表了一对 request/response,不能被执行两次。

4、RealCall

实际的 Call 对象,通过 execute() 和 enqueue() 方法来实现同步异步请求处理,并通过 getResponseWithInterceptorChain() 方法来执行 Interceptor 链条实现各种功能

5、Dispatcher

封装了线程池,异步执行请求;同步的 Call 也会队列保存

6、Interceptor

拦截器,分层实现缓存、透明压缩、网络 IO 等功能

Chain

拦截器链条

7、Response

响应

OkHttp 原理

1、创建 OkHttpClient 对象

简单的创建 OKHttpClient 方式:

1
OkHttpClient client = new OkHttpClient();

其实是封装了一个默认的 Builder:

1
2
3
4
// OkHttpClient
public OkHttpClient() {
    this(new Builder());
}

当然也可以自己通过 Builder 来设置参数:

1
2
3
OkHttpClient okhttpClient = new OkHttpClient.Builder()
        .addInterceptor(new CommonParamsInterceptor(false, null))
        .build();

2、发起 HTTP 请求

1
2
3
4
5
6
7
8
9
10
11
public void synchronous_get() throws Exception {
    OkHttpClient client = new OkHttpClient();

    Request request = new Request.Builder()
            .url("http://publicobject.com/helloworld.txt")
            .build();

    Response response = client.newCall(request).execute();
    if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
    String result = response.body().string();
}

OkHttpClient 实现了 Call.Factory 接口,负责根据 request 创建新的 Call。

1
2
3
interface Factory {
    Call newCall(Request request);
}

OkHttpClient 创建 Call,实际上是 new 了一个 RealCall

1
2
3
4
5
// OkHttpClient
@Override
public Call newCall(Request request) {
    return new RealCall(this, request, false /* for web socket */);
}

3.1、同步网络请求

就是 RealCall#execute(),同步网络请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// RealCall
@Override
public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
}
  1. 检查这个 call 是否已经被执行了,每个 call 只能被执行一次,如果想要一个完全一样的 call,可以利用 call#clone() 方法进行克隆
  2. 利用 client.dispatcher().executed(this) 来进行实际执行,dispatcher 就是 Dispatcher
  3. 调用 getResponseWithInterceptorChain() 获取 HTTP 返回结果,这个会进行一系列拦截操作
  4. 最后通知 dispatcher 自己已经执行完毕

真正发出网络请求,解析返回结果的,是 getResponseWithInterceptorChain()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// RealCall
Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
}

可以发现 Interceptor 是 OkHttp 最核心的一个东西,不只是负责拦截请求进行一些额外的处理(如 cookie),实际上它把网络请求、缓存、透明压缩等功能都统一了起来,每一个功能都是一个 Interceptor,它们连接成一个 Interceptor.Chain,环环相扣,最终完成一次网络请求。
Interceptor 分布:

  1. interceptors
    配置 OkHttpClient 配置的 interceptors
  2. RetryAndFollowUpInterceptor
    负责失败重试以及重定向的
  3. BridgeInterceptor
    负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应
  4. CacheInterceptor
    负责读取缓存直接返回、更新缓存
  5. ConnectInterceptor
    负责和服务器建立连接
  6. networkInterceptors
    配置 OkHttpClient 设置的 networkInterceptors
  7. CallServerInterceptor
    负责向服务器发送请求数据、从服务读取响应数据

责任链模式 在这个 Interceptor 链中得到了很好的实践。遵循链条每个 Interceptor 自行决定能否完成任务以及怎么完成任务(交给下一个 Interceptor)。这样一来,完成网络请求这件事就彻底从 RealCall 类中剥离出来了。

下面看 Chain 的具体实现 RealInterceptorChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // ...

    // 调用在这条链上的下一个Interceptor
    RealInterceptorChain next = new RealInterceptorChain(
        interceptors, streamAllocation, httpCodec, connection, index + 1, request);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next); // 调用Interceptor的intercept()方法

   // ...

    return response;
}

首先会取第一个 Interceptor 执行其 intercept() 方法,并把该链上的下一个 Interceptor 封装成 Chain 带给了 intercept(chain) 的参数,第一个 Interceptor 对 request 进行处理,看是否处理,处理了的话就会返回一个 response;如果没有处理的话,会通过 chain 参数,调用该链上的下一个 Interceptor 来执行,依次类推,直到有一个 Interceptor 对其进行了处理。

我们看看 ConnectInterceptorCallServerInterceptor 怎么和服务器进行实际通信的。

ConnectInterceptor 建立连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ConnectInterceptor
@Override
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
}

实际上建立连接就是创建了一个 HttpCodec 对象,它是对 HTTP 协议操作的抽象,有两个实现:Http1CodecHttp2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP2 版本的实现。

在 Http1Codec 中,它利用 Okio 对 Socket 的读写操作进行封装。

而创建 HttpCodec 对象的过程涉及到 StreamAllocationRealConnection,概括说就是找到一个可用的 RealConnection,再利用 RealConnection 的输入输出(BufferedSource 和 BufferedSink)创建 HttpCodec 对象。

CallServerInterceptor 发送和接收数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// CallServerInterceptor
@Override 
public Response intercept(Chain chain) throws IOException {
    // 一堆获取操作
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return what
      // we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
        // being reused. Otherwise we're still obligated to transmit the request body to leave the
        // connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();

    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
}
  1. 向服务器发送 request header
  2. 如果有 request body,向服务器发送
  3. 读取 response header,构造一个 Response 对象
  4. 如果有 response body,就在 3 基础上加上 body 构造一个新的 response

可以看到,核心工作都是由 HttpCodec 对象完成,而 HttpCodec 实际上利用的是 Okio,而 Okio 实际上还是用的 Socket。

3.2 异步网络请求

1
2
3
4
5
6
7
8
9
10
client.newCall(request).enqueue(new Callback() {
        @Override
        public void onFailure(Call call, IOException e) {
        }
        @Override
        public void onResponse(Call call, Response response) throws IOException {
            if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
            String result = response.body().string();
        }
    });

异步网络请求调用的是 RealCall#enqueue(callback)

1
2
3
4
5
6
7
8
9
10
// RealCall
Override
public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

调用的是 Dispatcher 的 enqueue() 方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Dispatcher
private int maxRequests = 64;
private int maxRequestsPerHost = 5;

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
}

如果异步调用请求总数超过了 maxRequests(默认为 64),并且同一个 host 的请求总数超过了 maxRequestsPerHost(默认为 5),就会添加到即将被执行的队列中去;
如果符合条件,就会通过线程池 ExecutorService 来 execute(),

1
2
// Dispatcher
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));

可以发现 AsyncCall 是一个 Runnable。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// AsyncCall
final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;

    // ...
    
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        if (retryAndFollowUpInterceptor.isCanceled()) {
          signalledCallback = true;
          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
        } else {
          signalledCallback = true;
          responseCallback.onResponse(RealCall.this, response);
        }
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } finally {
        client.dispatcher().finished(this);
      }
    }
}

// NamedRunnable
public abstract class NamedRunnable implements Runnable {
  // ...
  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

这里的 AsyncCall 是一个 Runnable,其里面了也调用了 getResponseWithInterceptorChain() 方法,并把结果通过 responseCallback 传给给了上层。

所以同步和异步的请求原理一样,都是在 getResponseWithInterceptorChain() 方法中通过 Interceptor 的链条来实现的网络请求逻辑,而异步通过 ExecutorService 实现。

4、返回数据的获取

同步在 Call#execute() 执行后,异步在 Callback#onResponse() 回调中的 Response 对象中获取到响应数据了。
响应体被封装到了 ResponseBody 类中,注意:

  1. 每个 body 只能被消费一次,多次消费会抛出异常
  2. body 必须被关闭,否则会发生资源泄露

Response 中的 body 特殊,服务器返回的数据可能非常大,所以必须通过数据流的方式来进行访问。

CallServerInterceptor 看到 body 生成代码:

1
2
3
4
5
6
7
8
9
10
11
12
// CallServerInterceptor
int code = response.code();
if (forWebSocket && code == 101) {
  // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
  response = response.newBuilder()
      .body(Util.EMPTY_RESPONSE)
      .build();
} else {
  response = response.newBuilder()
      .body(httpCodec.openResponseBody(response))
      .build();
}

HttpCodec#openResponseBody(reponse) 提供具体的 HTTP 协议版本的响应 body,而 HttpCodec 则是利用 Okio 实现具体的数据 IO 操作。

本文由作者按照 CC BY 4.0 进行授权