OKhttp(2): Main Flow Analysis
yunshuipiao opened this issue · 0 comments
OKhttp(2): Source code analysis(1)
[TOC]
上一篇文章介绍了 okhttp 的基本使用,以及官方 wiki 对于okhttp 的部分解释。
这一篇文章主要从源码来分析 okhttp,理解过程,不求每个细节。
使用 okhttp 最新代码,已从 java 转为 kotlin, 为okhttp4 的第一个版本
请求过程
这里还有从官方的 get 和 post 请求作为例子:
//
private final OkHttpClient client = new OkHttpClient();
// 同步 get 请求
public void run() throws Exception {
Request request = new Request.Builder()
.url("https://publicobject.com/helloworld.txt")
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
Headers responseHeaders = response.headers();
for (int i = 0; i < responseHeaders.size(); i++) {
System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
}
System.out.println(response.body().string());
}
}
// 异步 get 请求
public void run() throws Exception {
Request request = new Request.Builder()
.url("http://publicobject.com/helloworld.txt")
.build();
client.newCall(request).enqueue(new Callback() {
@Override public void onFailure(Call call, IOException e) {
e.printStackTrace();
}
@Override public void onResponse(Call call, Response response) throws IOException {
try (ResponseBody responseBody = response.body()) {
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
Headers responseHeaders = response.headers();
for (int i = 0, size = responseHeaders.size(); i < size; i++) {
System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
}
System.out.println(responseBody.string());
}
}
});
}
private final OkHttpClient client = new OkHttpClient();
private final Moshi moshi = new Moshi.Builder().build();
private final JsonAdapter<Gist> gistJsonAdapter = moshi.adapter(Gist.class);
public void run() throws Exception {
Request request = new Request.Builder()
.url("https://api.github.com/gists/c2a7c39532239ff261be")
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
Gist gist = gistJsonAdapter.fromJson(response.body().source());
for (Map.Entry<String, GistFile> entry : gist.files.entrySet()) {
System.out.println(entry.getKey());
System.out.println(entry.getValue().content);
}
}
}
static class Gist {
Map<String, GistFile> files;
}
static class GistFile {
String content;
}
在上面的例子中,无论是何种请求,大概过程都是如下:
创建 Request —> 重用的 client.newCall(request) 同步或者异步请求 —> 得到 Response
下面分别来讲解每一部分。
OkHttpClient
okhttp 用来创建 call, 进行请求和读取响应结果
val okhttpclient = Okhttpclient()
在程序中应该共享该 client, 所有的 http 请求都应该重用它, 因为可以复用连接池和线程池,减少延迟和节约内存。
如果需要自定义配置,则可以使用建造者模式创建 client。当目前存在的 client 需要修改是,可以调用 newBuilder() 重新创建 client ,其属性和之前一样,针对特定属性作出修改即可。
下面看一下其有哪些属性。
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher() //分发器:异步请求时的策略
internal var proxy: Proxy? = null //代理
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS //协议
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
internal val interceptors: MutableList<Interceptor> = mutableListOf() //拦截器
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf() //网络拦截器
internal var eventListenerFactory: EventListener.Factory = Util.eventListenerFactory(EventListener.NONE) //请求过程中的发生事件
internal var proxySelector: ProxySelector = ProxySelector.getDefault() ?: NullProxySelector() //代理选择
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES // cookie管理
internal var cache: Cache? = null //缓存
internal var internalCache: InternalCache? = null
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
internal var sslSocketFactory: SSLSocketFactory? = null
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var authenticator: Authenticator = Authenticator.NONE
internal var connectionPool: ConnectionPool = ConnectionPool() //连接池
internal var dns: Dns = Dns.SYSTEM
internal var followSslRedirects: Boolean = true
internal var followRedirects: Boolean = true
internal var retryOnConnectionFailure: Boolean = true
internal var callTimeout: Int = 0
internal var connectTimeout: Int = 10000
internal var readTimeout: Int = 10000
internal var writeTimeout: Int = 10000
internal var pingInterval: Int = 0
上面的主要内容是 dispatcher, interceptors, networkinterceptors, 几个 timeout。
至此完成 client 的创建。
Request
Request 也是使用建造者模式创建对象。
open class Builder {
internal var url: HttpUrl? = null // 请求地址
internal var method: String // 请求方法
internal var headers: Headers.Builder // 请求头
internal var body: RequestBody? = null // 请求体
这里的 HttpUrl 相比起 URL 和 URI 来说更为安全现代,可以解析 一直地址为具体的部分,scheme, host, path, 不能解析时返回 null ,不会抛异常。
class Builder {
internal var scheme: String? = null
internal var encodedUsername = ""
internal var encodedPassword = ""
internal var host: String? = null
internal var port = -1
internal val encodedPathSegments = mutableListOf<String>()
internal var encodedQueryNamesAndValues: MutableList<String?>? = null
internal var encodedFragment: String? = null
所以传入的 字符串 url 会被HttpUrl.get(finalUrl))
, 提供更准确的结果。
RealCall
创建好了 Request, 接着 client.newCall()
创建 call 就发送请求,看一下代码:
/** Prepares the [request] to be executed at some point in the future. */
override fun newCall(request: Request): Call {
return RealCall.newRealCall(this, request, false /* for web socket */)
}
companion object {
fun newRealCall(
client: OkHttpClient,
originalRequest: Request,
forWebSocket: Boolean
): RealCall {
// Safely publish the Call instance to the EventListener.
return RealCall(client, originalRequest, forWebSocket).apply {
transmitter = Transmitter(client, this)
}
}
}
将 request,client 作为参数创建了 RealCall。并对 tranmitter 进行了赋值。
transmitter:在 okhttp 程序和网络层之前的桥接部分,对连接,请求,响应和流进行高层封装,支持异步取消请求。
Response
构建出了 RealCall 在调用 exxcute 同步请求之后,先不管请求过程,得到响应结果。
open class Builder {
internal var request: Request? = null // 请求
internal var protocol: Protocol? = null // 协议
internal var code = -1 //状态码
internal var message: String? = null
internal var handshake: Handshake? = null //http握手
internal var headers: Headers.Builder // 响应头
internal var body: ResponseBody? = null //响应消息
internal var networkResponse: Response? = null //网络响应
internal var cacheResponse: Response? = null //缓存响应
internal var priorResponse: Response? = null // 重定向或者验证的响应,已被消费
internal var sentRequestAtMillis: Long = 0
internal var receivedResponseAtMillis: Long = 0
internal var exchange: Exchange? = null
同样是建造者模式。
目前已经知道一个 http 请求的过程及其各个部分的作用,下面看一下 okhttp 的同步和异步请求。
同步和异步请求
同步
在 okhttp 中支持同步和异步请求,还是在 RealCall 类中,
override fun execute(): Response {
// 对 该 call 加锁
synchronized(this) {
// 如果正在执行就抛异常
check(!executed) { "Already Executed" }
executed = true
}
// timeout 计时
transmitter.timeoutEnter()
// 打印网络连接事件
transmitter.callStart()
try {
client.dispatcher().executed(this)
return getResponseWithInterceptorChain()
} finally {
client.dispatcher().finished(this)
}
}
/** Used by `Call#execute` to signal it is in-flight. */
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
/** Used by `Call#execute` to signal completion. */
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
执行请求和请求结束,使用 runnsingSyncCalls 来管理 getResponseWithInterceptorChain() 后续讲解,同步请求结束。
异步
override fun enqueue(responseCallback: Callback) {
synchronized(this) {
check(!executed) { "Already Executed" }
executed = true
}
transmitter.callStart()
client.dispatcher().enqueue(AsyncCall(responseCallback))
}
异步请求中,使用 Callback 来对 response 进行回调返回。
interface Callback {
fun onFailure(call: Call, e: IOException)
@Throws(IOException::class)
fun onResponse(call: Call, response: Response)
}
在程序中一般会对 CallBack 进行封装,满足自己的业务需求。
AsyncCall(responseCallback)
是 RealCall 的一个内部类,集成了Runnable, 异步执行任务。
继续往下走,进入 dispatch,
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
// 共享使用相同 host 的值, 表示每个 host 有几个 call
val existingCall = findExistingCallWithHost(call.host())
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
private fun promoteAndExecute(): Boolean {
// 检查是否有锁
assert(!Thread.holdsLock(this))
val executableCalls = ArrayList<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
// 执行等待请求
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity. 最大请求 64
if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity. 每个 host 的最大执行请求 5
i.remove()
asyncCall.callsPerHost().incrementAndGet()
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService())
}
// 返回结果,是否正在请求,runningCall.size > 0
return isRunning
}
// 异步执行 AsyncCall
fun executeOn(executorService: ExecutorService) {
assert(!Thread.holdsLock(client.dispatcher()))
var success = false
try {
// 执行run方法
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
transmitter.noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher().finished(this) // This call is no longer running!
}
}
}
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
transmitter.timeoutEnter()
try {
val response = getResponseWithInterceptorChain()
signalledCallback = true
// 响应成功的回调
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
} else {
// 响应失败的回掉
responseCallback.onFailure(this@RealCall, e)
}
} finally {
// 结束该请求的执行
client.dispatcher().finished(this)
}
}
}
到此为止,同步和异步的执行基本分析完毕。目前还有一个方法, getResponseWithInterceptorChain() 获得 响应结果,涉及到 okhttp 最强大的拦截器。
Intercepor 拦截器
都知道 okhttp 的拦截器使用了责任链模式,下面就从这个方法开始讲起:
@Throws(IOException::class)
fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = ArrayList<Interceptor>()
interceptors.addAll(client.interceptors()) //自定义应用拦截器,注意networkinterceptor的顺序
interceptors.add(RetryAndFollowUpInterceptor(client)) // 重试和重定向拦截器
interceptors.add(BridgeInterceptor(client.cookieJar())) // 转化请求拦截器
interceptors.add(CacheInterceptor(client.internalCache())) // 缓存拦截器
interceptors.add(ConnectInterceptor(client)) //连接拦截器
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors()) //自定义网络拦截器
}
interceptors.add(CallServerInterceptor(forWebSocket)) //发请求和接收响应拦截器
// 创建拦截器Chain
val chain = RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis())
var calledNoMoreExchanges = false
try {
// 拦截器处理请求,返回响应结果
val response = chain.proceed(originalRequest)
if (transmitter.isCanceled) {
closeQuietly(response)
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw transmitter.noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null)
}
}
}
上面最重要的方法是 val response = chain.proceed(originalRequest)
, 包括在 chain 的构造过程中,index = 0,表示使用 第一个拦截器。
@Throws(IOException::class)
fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
if (index >= interceptors.size) throw AssertionError()
// 每个拦截器只能处理一次
calls++
// If we already have a stream, confirm that the incoming request will use it.
if (this.exchange != null && !this.exchange.connection()!!.supportsUrl(request.url())) {
throw IllegalStateException("network interceptor " + interceptors[index - 1] +
" must retain the same host and port")
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.exchange != null && calls > 1) {
throw IllegalStateException("network interceptor " + interceptors[index - 1] +
" must call proceed() exactly once")
}
// Call the next interceptor in the chain.
// 构造下一个 chain, index + 1
val next = RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
val interceptor = interceptors[index]
@Suppress("USELESS_ELVIS")
// 当前拦截器处理下一个 chain, 获得response 。
val response = interceptor.intercept(next) ?: throw NullPointerException(
"interceptor $interceptor returned null")
// Confirm that the next interceptor made its required call to chain.proceed().
if (exchange != null && index + 1 < interceptors.size && next.calls != 1) {
throw IllegalStateException("network interceptor " + interceptor +
" must call proceed() exactly once")
}
if (response.body() == null) {
throw IllegalStateException(
"interceptor $interceptor returned a response with no body")
}
return response
}
这里对拦截器的处理过程做个分析:
interface Interceptor {
@Throws(IOException::class)
fun intercept(chain: Chain): Response
}
interface Chain {
fun request(): Request
@Throws(IOException::class)
fun proceed(request: Request): Response
...
}
首先是第0个 RealInterceptorChain 处理 process 请求, 内部执行 interceptor.intercept(chain) 返回当前 response;执行内部构造第1个 RealInterceptorChain 又处理请求。
有类似递归的方式,直到最后一个拦截器 没有chain.proceed(), 直接返回了 Response。
那么这里就可以体现拦截器使用责任链模式的好处:
每个拦截器只对自己的 resquest 和 response 负责,从上往下传递,互不依赖,彻底解耦。
总结
本文章大概从源码角度分析了 okhttp 的使用过程,当然还有 具体每个拦截器的作用,缓存,分发器, 连接池等后续也会分析。
- okhttp 的 resques, response处理和具体内容
- 拦截器和责任链模式