yunshuipiao/Potato

OKhttp(2): Main Flow Analysis

yunshuipiao opened this issue · 0 comments

OKhttp(2): Source code analysis(1)

[TOC]

上一篇文章介绍了 okhttp 的基本使用,以及官方 wiki 对于okhttp 的部分解释。

这一篇文章主要从源码来分析 okhttp,理解过程,不求每个细节。

使用 okhttp 最新代码,已从 java 转为 kotlin, 为okhttp4 的第一个版本

请求过程

这里还有从官方的 get 和 post 请求作为例子:

// 
private final OkHttpClient client = new OkHttpClient();

// 同步 get 请求 
  public void run() throws Exception {
    Request request = new Request.Builder()
        .url("https://publicobject.com/helloworld.txt")
        .build();

    try (Response response = client.newCall(request).execute()) {
      if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);

      Headers responseHeaders = response.headers();
      for (int i = 0; i < responseHeaders.size(); i++) {
        System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
      }

      System.out.println(response.body().string());
    }
  }

// 异步 get 请求
public void run() throws Exception {
    Request request = new Request.Builder()
        .url("http://publicobject.com/helloworld.txt")
        .build();

    client.newCall(request).enqueue(new Callback() {
      @Override public void onFailure(Call call, IOException e) {
        e.printStackTrace();
      }

      @Override public void onResponse(Call call, Response response) throws IOException {
        try (ResponseBody responseBody = response.body()) {
          if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);

          Headers responseHeaders = response.headers();
          for (int i = 0, size = responseHeaders.size(); i < size; i++) {
            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
          }

          System.out.println(responseBody.string());
        }
      }
    });
  }
private final OkHttpClient client = new OkHttpClient();
  private final Moshi moshi = new Moshi.Builder().build();
  private final JsonAdapter<Gist> gistJsonAdapter = moshi.adapter(Gist.class);

  public void run() throws Exception {
    Request request = new Request.Builder()
        .url("https://api.github.com/gists/c2a7c39532239ff261be")
        .build();
    try (Response response = client.newCall(request).execute()) {
      if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);

      Gist gist = gistJsonAdapter.fromJson(response.body().source());

      for (Map.Entry<String, GistFile> entry : gist.files.entrySet()) {
        System.out.println(entry.getKey());
        System.out.println(entry.getValue().content);
      }
    }
  }

  static class Gist {
    Map<String, GistFile> files;
  }

  static class GistFile {
    String content;
  }

在上面的例子中,无论是何种请求,大概过程都是如下:

创建 Request —> 重用的 client.newCall(request) 同步或者异步请求 —> 得到 Response

下面分别来讲解每一部分。

OkHttpClient

okhttp 用来创建 call, 进行请求和读取响应结果

val okhttpclient = Okhttpclient()

在程序中应该共享该 client, 所有的 http 请求都应该重用它, 因为可以复用连接池和线程池,减少延迟和节约内存。

如果需要自定义配置,则可以使用建造者模式创建 client。当目前存在的 client 需要修改是,可以调用 newBuilder() 重新创建 client ,其属性和之前一样,针对特定属性作出修改即可。

下面看一下其有哪些属性。

  class Builder constructor() {
    internal var dispatcher: Dispatcher = Dispatcher()  //分发器:异步请求时的策略
    internal var proxy: Proxy? = null  //代理
    internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS  //协议
    internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
    internal val interceptors: MutableList<Interceptor> = mutableListOf()  //拦截器
    internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()  //网络拦截器
    internal var eventListenerFactory: EventListener.Factory = Util.eventListenerFactory(EventListener.NONE)  //请求过程中的发生事件
    internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector()  //代理选择
    internal var cookieJar: CookieJar = CookieJar.NO_COOKIES  // cookie管理
    internal var cache: Cache? = null  //缓存
    internal var internalCache: InternalCache? = null
    internal var socketFactory: SocketFactory = SocketFactory.getDefault()
    internal var sslSocketFactory: SSLSocketFactory? = null
    internal var certificateChainCleaner: CertificateChainCleaner? = null
    internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
    internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
    internal var proxyAuthenticator: Authenticator = Authenticator.NONE
    internal var authenticator: Authenticator = Authenticator.NONE
    internal var connectionPool: ConnectionPool = ConnectionPool() //连接池
    internal var dns: Dns = Dns.SYSTEM
    internal var followSslRedirects: Boolean = true
    internal var followRedirects: Boolean = true
    internal var retryOnConnectionFailure: Boolean = true
    internal var callTimeout: Int = 0  
    internal var connectTimeout: Int = 10000
    internal var readTimeout: Int = 10000
    internal var writeTimeout: Int = 10000
    internal var pingInterval: Int = 0

上面的主要内容是 dispatcher, interceptors, networkinterceptors, 几个 timeout。

至此完成 client 的创建。

Request

Request 也是使用建造者模式创建对象。

  open class Builder {
    internal var url: HttpUrl? = null  // 请求地址
    internal var method: String  // 请求方法
    internal var headers: Headers.Builder  // 请求头
    internal var body: RequestBody? = null  // 请求体

这里的 HttpUrl 相比起 URL 和 URI 来说更为安全现代,可以解析 一直地址为具体的部分,scheme, host, path, 不能解析时返回 null ,不会抛异常。

  class Builder {
    internal var scheme: String? = null
    internal var encodedUsername = ""
    internal var encodedPassword = ""
    internal var host: String? = null
    internal var port = -1
    internal val encodedPathSegments = mutableListOf<String>()
    internal var encodedQueryNamesAndValues: MutableList<String?>? = null
    internal var encodedFragment: String? = null

所以传入的 字符串 url 会被HttpUrl.get(finalUrl)) , 提供更准确的结果。

RealCall

创建好了 Request, 接着 client.newCall() 创建 call 就发送请求,看一下代码:

  /** Prepares the [request] to be executed at some point in the future. */
  override fun newCall(request: Request): Call {
    return RealCall.newRealCall(this, request, false /* for web socket */)
  }
  companion object {
    fun newRealCall(
      client: OkHttpClient,
      originalRequest: Request,
      forWebSocket: Boolean
    ): RealCall {
      // Safely publish the Call instance to the EventListener.
      return RealCall(client, originalRequest, forWebSocket).apply {
        transmitter = Transmitter(client, this)
      }
    }
  }

将 request,client 作为参数创建了 RealCall。并对 tranmitter 进行了赋值。

transmitter:在 okhttp 程序和网络层之前的桥接部分,对连接,请求,响应和流进行高层封装,支持异步取消请求。

Response

构建出了 RealCall 在调用 exxcute 同步请求之后,先不管请求过程,得到响应结果。

open class Builder {
  internal var request: Request? = null  // 请求
  internal var protocol: Protocol? = null // 协议
  internal var code = -1 //状态码
  internal var message: String? = null 
  internal var handshake: Handshake? = null  //http握手
  internal var headers: Headers.Builder // 响应头
  internal var body: ResponseBody? = null //响应消息
  internal var networkResponse: Response? = null //网络响应
  internal var cacheResponse: Response? = null //缓存响应
  internal var priorResponse: Response? = null // 重定向或者验证的响应,已被消费
  internal var sentRequestAtMillis: Long = 0
  internal var receivedResponseAtMillis: Long = 0
  internal var exchange: Exchange? = null

同样是建造者模式。


目前已经知道一个 http 请求的过程及其各个部分的作用,下面看一下 okhttp 的同步和异步请求。

同步和异步请求

同步

在 okhttp 中支持同步和异步请求,还是在 RealCall 类中,

override fun execute(): Response {
  // 对 该 call 加锁
  synchronized(this) {
    // 如果正在执行就抛异常
    check(!executed) { "Already Executed" }
    executed = true
  }
  // timeout 计时
  transmitter.timeoutEnter()
  // 打印网络连接事件
  transmitter.callStart()
  try {
    client.dispatcher().executed(this)
    return getResponseWithInterceptorChain()
  } finally {
    client.dispatcher().finished(this)
  }
}
  /** Used by `Call#execute` to signal it is in-flight.  */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }
  
    /** Used by `Call#execute` to signal completion.  */
  internal fun finished(call: RealCall) {
    finished(runningSyncCalls, call)
  }

执行请求和请求结束,使用 runnsingSyncCalls 来管理 getResponseWithInterceptorChain() 后续讲解,同步请求结束。

异步

override fun enqueue(responseCallback: Callback) {
  synchronized(this) {
    check(!executed) { "Already Executed" }
    executed = true
  }
  transmitter.callStart()
  client.dispatcher().enqueue(AsyncCall(responseCallback))
}

异步请求中,使用 Callback 来对 response 进行回调返回。

interface Callback {
  fun onFailure(call: Call, e: IOException)

  @Throws(IOException::class)
  fun onResponse(call: Call, response: Response)
}

在程序中一般会对 CallBack 进行封装,满足自己的业务需求。

AsyncCall(responseCallback) 是 RealCall 的一个内部类,集成了Runnable, 异步执行任务。

继续往下走,进入 dispatch,

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.get().forWebSocket) {
        // 共享使用相同 host 的值, 表示每个 host 有几个 call
        val existingCall = findExistingCallWithHost(call.host())
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
private fun promoteAndExecute(): Boolean {
  // 检查是否有锁
  assert(!Thread.holdsLock(this))

  val executableCalls = ArrayList<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    // 执行等待请求
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) {
      val asyncCall = i.next()

      if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.  最大请求 64
      if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity. 每个 host 的最大执行请求 5

      i.remove()
      asyncCall.callsPerHost().incrementAndGet()
      executableCalls.add(asyncCall)
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }

  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    asyncCall.executeOn(executorService())
  }
	// 返回结果,是否正在请求,runningCall.size > 0
  return isRunning
}
// 异步执行 AsyncCall    
fun executeOn(executorService: ExecutorService) {
      assert(!Thread.holdsLock(client.dispatcher()))
      var success = false
      try {
        // 执行run方法
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        transmitter.noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher().finished(this) // This call is no longer running!
        }
      }
    }
    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        transmitter.timeoutEnter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          // 响应成功的回调
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
          } else {
            // 响应失败的回掉
            responseCallback.onFailure(this@RealCall, e)
          }
        } finally {
          // 结束该请求的执行
          client.dispatcher().finished(this)
        }
      }
    }

到此为止,同步和异步的执行基本分析完毕。目前还有一个方法, getResponseWithInterceptorChain() 获得 响应结果,涉及到 okhttp 最强大的拦截器。

Intercepor 拦截器

都知道 okhttp 的拦截器使用了责任链模式,下面就从这个方法开始讲起:

 @Throws(IOException::class)
  fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = ArrayList<Interceptor>()
    interceptors.addAll(client.interceptors()) //自定义应用拦截器,注意networkinterceptor的顺序
    interceptors.add(RetryAndFollowUpInterceptor(client)) // 重试和重定向拦截器
    interceptors.add(BridgeInterceptor(client.cookieJar())) // 转化请求拦截器
    interceptors.add(CacheInterceptor(client.internalCache())) // 缓存拦截器
    interceptors.add(ConnectInterceptor(client))  //连接拦截器
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors())  //自定义网络拦截器
    }
    interceptors.add(CallServerInterceptor(forWebSocket))  //发请求和接收响应拦截器

    // 创建拦截器Chain
    val chain = RealInterceptorChain(interceptors, transmitter, null, 0,
        originalRequest, this, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis())

    var calledNoMoreExchanges = false
    try {
      // 拦截器处理请求,返回响应结果
      val response = chain.proceed(originalRequest)
      if (transmitter.isCanceled) {
        closeQuietly(response)
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw transmitter.noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null)
      }
    }
  }

上面最重要的方法是 val response = chain.proceed(originalRequest), 包括在 chain 的构造过程中,index = 0,表示使用 第一个拦截器。

@Throws(IOException::class)
  fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
    if (index >= interceptors.size) throw AssertionError()
		
    // 每个拦截器只能处理一次
    calls++
		
    // If we already have a stream, confirm that the incoming request will use it.
    if (this.exchange != null && !this.exchange.connection()!!.supportsUrl(request.url())) {
      throw IllegalStateException("network interceptor " + interceptors[index - 1] +
          " must retain the same host and port")
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.exchange != null && calls > 1) {
      throw IllegalStateException("network interceptor " + interceptors[index - 1] +
          " must call proceed() exactly once")
    }

    // Call the next interceptor in the chain.
    // 构造下一个 chain, index + 1
    val next = RealInterceptorChain(interceptors, transmitter, exchange,
        index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    // 当前拦截器处理下一个 chain, 获得response 。
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (exchange != null && index + 1 < interceptors.size && next.calls != 1) {
      throw IllegalStateException("network interceptor " + interceptor +
          " must call proceed() exactly once")
    }

    if (response.body() == null) {
      throw IllegalStateException(
          "interceptor $interceptor returned a response with no body")
    }

    return response
  }

这里对拦截器的处理过程做个分析:

interface Interceptor {
  @Throws(IOException::class)
  fun intercept(chain: Chain): Response
}
  
interface Chain {
    fun request(): Request

    @Throws(IOException::class)
    fun proceed(request: Request): Response
    ...
}

首先是第0个 RealInterceptorChain 处理 process 请求, 内部执行 interceptor.intercept(chain) 返回当前 response;执行内部构造第1个 RealInterceptorChain 又处理请求。

有类似递归的方式,直到最后一个拦截器 没有chain.proceed(), 直接返回了 Response。

那么这里就可以体现拦截器使用责任链模式的好处:

每个拦截器只对自己的 resquest 和 response 负责,从上往下传递,互不依赖,彻底解耦。

总结

本文章大概从源码角度分析了 okhttp 的使用过程,当然还有 具体每个拦截器的作用,缓存,分发器, 连接池等后续也会分析。

  1. okhttp 的 resques, response处理和具体内容
  2. 拦截器和责任链模式