OkHttp 4源码(1)— OkHttp初始化和请求构造分析

OkHttp 4源码(1)— OkHttp初始化和请求构造分析概括本篇主要从OkHttp的两个请求示例开始,对Okhttp的初始化工作,和请求从构造、分发到执行的流程进行源码分析介绍OkHttp整体流程(本文覆盖红色部分)本文覆盖代码流程图示例使用OkHttp一

本文基于OkHttp 4.3.1源码分析 OkHttp – 官方地址 OkHttp – GitHub代码地址

概括

本篇主要从OkHttp的两个请求示例开始,对Okhttp的初始化工作,和请求从构造、分发到执行的流程进行源码分析介绍

OkHttp整体流程(本文覆盖红色部分)

OkHttp 4源码(1)— OkHttp初始化和请求构造分析

本文覆盖代码流程图

OkHttp 4源码(1)— OkHttp初始化和请求构造分析

示例

使用OkHttp一般流程,初始化一个共享OkHttpClient,构建Request,然后OkHttpClient根据Request构建Call,接着执行call,最后进行Response处理

同步请求

public class GetExample {
  OkHttpClient client = new OkHttpClient(); // 构建共享的Client

  String run(String url) throws IOException {
    Request request = new Request.Builder() // 构建request
        .url(url)
        .build();
    // 构建Call,执行
    try (Response response = client.newCall(request).execute()) {
      return response.body().string();
    }
  }

  public static void main(String[] args) throws IOException {
    GetExample example = new GetExample();
    String response = example.run("https://raw.github.com/square/okhttp/master/README.md");
    System.out.println(response);
  }
}

异步请求

public final class AsynchronousGet {
  private final OkHttpClient client = new OkHttpClient();// 构建共享Client

  public void run() throws Exception {
    // 构建Request
    Request request = new Request.Builder()
        .url("http://publicobject.com/helloworld.txt")
        .build();
    // 构建Call,执行,回调接受处理
    client.newCall(request).enqueue(new Callback() {
      @Override public void onFailure(Call call, IOException e) {
        e.printStackTrace();
      }

      @Override public void onResponse(Call call, Response response) throws IOException {
        try (ResponseBody responseBody = response.body()) {
          if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);

          Headers responseHeaders = response.headers();
          for (int i = 0, size = responseHeaders.size(); i < size; i++) {
            System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
          }

          System.out.println(responseBody.string());
        }
      }
    });
  }

  public static void main(String... args) throws Exception {
    new AsynchronousGet().run();
  }
}

源码分析

构建OkHttpClient

  • OkHttpClient是Call的一个工厂类,OkHttpClient应该是共享的,或者说是单例
  • 可以通过newBuilder来自定义Client
  • 没有必要关心 关闭和资源释放
/*
 * Factory for [calls][Call], which can be used to send HTTP requests and read their responses.
 * ## OkHttpClients Should Be Shared
 * ## Customize Your Client With newBuilder()
 * ## Shutdown Isn't Necessary
 * / open class OkHttpClient internal constructor( builder: Builder ) : Cloneable, Call.Factory, WebSocket.Factory { // 3. 成员变量初始化 ... // 1. 内部无参数构造函数 constructor() : this(Builder()) // 4. OkHttpClient函数初始化 init { // 初始化证书和拦截器等判断逻辑 ... } // 2. Builder构造函数
    class Builder constructor() {
        ...
    } 
}

OkHttpClient.Builder

Builder模式,提供自定义配置化能力,同时有一份无需关心的默认配置

  class Builder constructor() {
        internal var dispatcher: Dispatcher = Dispatcher()
        internal var connectionPool: ConnectionPool = ConnectionPool()
        internal val interceptors: MutableList<Interceptor> = mutableListOf()
        internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
        internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
        internal var retryOnConnectionFailure = true
        internal var authenticator: Authenticator = Authenticator.NONE
        internal var followRedirects = true
        internal var followSslRedirects = true
        internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
        internal var cache: Cache? = null
        internal var dns: Dns = Dns.SYSTEM
        internal var proxy: Proxy? = null
        internal var proxySelector: ProxySelector? = null
        internal var proxyAuthenticator: Authenticator = Authenticator.NONE
        internal var socketFactory: SocketFactory = SocketFactory.getDefault()
        internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
        internal var x509TrustManagerOrNull: X509TrustManager? = null
        internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
        internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
        internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
        internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
        internal var certificateChainCleaner: CertificateChainCleaner? = null
        internal var callTimeout = 0
        internal var connectTimeout = 10_000
        internal var readTimeout = 10_000
        internal var writeTimeout = 10_000
        internal var pingInterval = 0
    }   

OkHttpClient成员变量初始化

  • 初始化成员变量
  • JvmName是为了支持兼容3.x
  @get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher

  @get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool

  @get:JvmName("interceptors") val interceptors: List<Interceptor> =
      builder.interceptors.toImmutableList()

  @get:JvmName("networkInterceptors") val networkInterceptors: List<Interceptor> =
      builder.networkInterceptors.toImmutableList()

  @get:JvmName("eventListenerFactory") val eventListenerFactory: EventListener.Factory =
      builder.eventListenerFactory

  @get:JvmName("retryOnConnectionFailure") val retryOnConnectionFailure: Boolean =
      builder.retryOnConnectionFailure

  @get:JvmName("cookieJar") val cookieJar: CookieJar = builder.cookieJar

  @get:JvmName("cache") val cache: Cache? = builder.cache

  ....

构建Request

Request 对应HTTP请求中的Request,OkHttp依旧是Builder构建模式构建Request

Request.Builder

支持url、method、headers、body的配置

 open class Builder {
    internal var url: HttpUrl? = null 
    internal var method: String
    internal var headers: Headers.Builder
    internal var body: RequestBody? = null
    // 构造Request
    open fun build(): Request {
      return Request(
          checkNotNull(url) { "url == null" },
          method,
          headers.build(),
          body,
          tags.toImmutableMap()
      )
    }
}

Request

通过Request.Builder构造Request实例

class Request internal constructor(
  @get:JvmName("url") val url: HttpUrl,
  @get:JvmName("method") val method: String,
  @get:JvmName("headers") val headers: Headers,
  @get:JvmName("body") val body: RequestBody?,
  internal val tags: Map<Class<*>, Any>
) {
    ...
}

构建Call

OkHttpClient.newCall

OkHttpClient 实现了Call.Factory,作为Call的构造工厂类

  override fun newCall(request: Request): Call {
    // 执行 RealCall的构造call方法
    return RealCall.newRealCall(this, request, forWebSocket = false)
  }

RealCall.newRealCall

构造Call真正方法,另外创建了一个 发射器,接下来先了解下Call

companion object {
    fun newRealCall( client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean ): RealCall {
      
      return RealCall(client, originalRequest, forWebSocket).apply {
       // 构造了一个 发射器,它是应用层和网络层交互的桥梁,后面会着重介绍
        transmitter = Transmitter(client, this) 
      }
    }
  }

RealCall

Call定义为一个准备好执行的请求,它是能被取消的,且它只能被执行一次(http请求也是一次执行) 包括一个核心成员变量 Transmitter ,两个重要方法 execute(同步) 和 enqueue(异步)


internal class RealCall private constructor(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) : Call {
 // 发射机
 private lateinit var transmitter: Transmitter
 
 // 同步请求执行方法
 override fun execute(): Response {
  ...
 }
  // 异步请求执行方法 
  override fun enqueue(responseCallback: Callback) {
    ...
  }}
  
  // 取消请求
  override fun cancel() {
    transmitter.cancel()
  }

同步请求

RealCall.execute

  • 请求前校验逻辑,仅能执行一次,和过期时间逻辑判断逻辑
  • 通知请求start事件,便于metrics 指标数据收集
  • 将call加入分发器的同步请求队列中
  • 通过拦截器责任链模式进行请求和返回一系列逻辑处理
  override fun execute(): Response {
    // 检查是否已经执行
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    // 过期时间逻辑,如果配置了会有WatchDog线程进行Watch然后执行退出逻辑
    transmitter.timeoutEnter()
    // 通知start,最后会通过 EventListener 发出时间,主要目的是收集 metrics events
    transmitter.callStart()
    try {
      // 调用client的dispatcher分发器执行call(将call加入同步call队列)
      client.dispatcher.executed(this)
      // 通过拦截器责任链模式进行请求和返回处理等一系类逻辑
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

RealCall.getResponseWithInterceptorChain

  • 配置拦截器,所有请求和响应处理逻辑解耦到各个拦截器负责模块
  • 构造拦截器链式调用处理类RealInterceptorChain实例
  • chain.proceed 进行拦截器的链式调用
  fun getResponseWithInterceptorChain(): Response {
    // 构建所有的拦截器
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors // client配置的拦截器
    interceptors += RetryAndFollowUpInterceptor(client) // 重试机制拦截器
    interceptors += BridgeInterceptor(client.cookieJar) // 请求和返回桥(http信息配置和解析)拦截器
    interceptors += CacheInterceptor(client.cache) // 缓存拦截
    interceptors += ConnectInterceptor // 连接拦截器
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)  // 执行拦截器
    
    // 拦截器核心处理类 
    val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
        client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)

    var calledNoMoreExchanges = false
    try {
      // 链式调用
      val response = chain.proceed(originalRequest)
      if (transmitter.isCanceled) { // 处理取消
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response //返回请求结果
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw transmitter.noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        transmitter.noMoreExchanges(null)
      }
    }
  }

异步请求

RealCall.enqueue

  • 构造一个异步call
  • 调用dispatcher 入队AsyncCall
  override fun enqueue(responseCallback: Callback) {
    synchronized(this) { 
      check(!executed) { "Already Executed" }
      executed = true
    }
    transmitter.callStart()
    // 构造 AsyncCall ,接着分发器入队操作
    client.dispatcher.enqueue(AsyncCall(responseCallback)) 
  }

Dispatcher 分发器队Call进行分发执行

参考:线程池理解

  • 初始化时,构造分发器的线程池,及对应执行参数
  • 主要管理异步请求的处理(异步Call入队、并发执行)
class Dispatcher constructor() {
  // 默认最大请求数 64
  @get:Synchronized var maxRequests = 64
  // 默认最大并发Host 5
  @get:Synchronized var maxRequestsPerHost = 5
  // 线程池执行器 默认创建可缓存线程池 
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("OkHttp Dispatcher", false))
      }
      return executorServiceOrNull!!
    }
    
  /** 准备好的异步Call对列 */
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  /** 执行中异步call对列 */
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  /** 同步call对列 */
  private val runningSyncCalls = ArrayDeque<RealCall>()
  
  // 异步Call 入队操作 
  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)
      // same host情况下的复用逻辑处理
      if (!call.get().forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host())
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute() // 准备线程池执行器,及执行
  }

// 执行Call
private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      // 遍历所有的ready 异步 call
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost().incrementAndGet()
        executableCalls.add(asyncCall) // 加入此次执行队列缓存
        runningAsyncCalls.add(asyncCall) // 加入正在执行队列
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService) // 将线程执行器传入call,在Call中进行执行
    }

    return isRunning
  }
  
  /** 同步call,添加到队列 */
  @Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
  }

}

AsyncCall执行

  • executeOn ,线程池执行器执行Runnable
  • run,通过拦截器进行请求和获取响应结果
fun executeOn(executorService: ExecutorService) {
    client.dispatcher.assertThreadDoesntHoldLock()
    
    var success = false
    try {
        // 线程池 执行器执行
        executorService.execute(this)
        success = true
    }
}
// 执行方法
override fun run() {
    threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        transmitter.timeoutEnter()
        try {
            // 同“同步请求” 最终执行拦截器链式调用
            val response = getResponseWithInterceptorChain()
            signalledCallback = true
            // 响应 回调
            responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
            ...
        } finally {
            client.dispatcher.finished(this)
        }
    }
}

OkHttp 4源码(2)— 拦截器机制分析

今天的文章OkHttp 4源码(1)— OkHttp初始化和请求构造分析分享到此就结束了,感谢您的阅读。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/23307.html

(0)
编程小号编程小号

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注