💪
AndroidCollect
  • 写在前面
  • 计算机基础
    • 计算机组成原理
    • 算法
      • 查找
        • 二分查找
      • 排序
        • 简单排序
        • 高级排序
        • 特殊排序
      • 海量数据
      • 思想
        • 贪心
        • 分治
        • 动态规划
        • 回溯
      • 哈希算法
    • 数据结构
      • 队列
        • 知识点
        • 相关题目
          • 用两个栈实现队列
          • 实现循环队列
          • 用链表实现队列
          • 用数组实现队列
      • 栈
        • 相关算法题目
          • 用链表实现栈
          • 用数组实现栈
      • 链表
        • 知识点梳理
        • 相关算法题目
          • 删除倒数第n个结点
          • 合并两个有序链表
          • 检测单链表是否有环
          • 获取中间结点
          • 反转链表
      • 跳表
      • 哈希表
      • 树
        • 二叉树
        • 二叉查找树
        • AVL 树
        • Trie 树
        • 红黑树
      • 堆
        • 存储
        • 堆的应用
      • 图
    • 网络
      • 应用层协议
        • DNS
        • HTTP
        • HTTPS
      • 传输层协议
        • TCP
        • UDP
      • 输入网址后发生了什么
    • 操作系统
      • 内存
    • 数据库
  • 软件工程
    • 编程思想
    • 设计模式
      • 状态模式
      • 装饰器模式
      • 代理模式
      • 责任链模式
      • 建造者模式
      • 单例模式
      • 观察者模式
  • Java
    • 基础
    • 异常
    • 并发编程
      • ThreadLocal
      • 线程池
      • 理解 volatile
      • AbstractQueuedSynchronizer
    • 集合
      • LinkedHashMap 源码
      • HashMap 源码
    • 注解
    • 反射
      • JDK 动态代理
    • JVM
      • 自动内存管理机制
      • Class 文件格式
      • 类加载机制
      • Java 内存模型(JMM)
      • 字节码指令
      • HotSpot 虚拟机实现细节
    • 源码与原理
    • 各版本主要特性
  • Android
    • 基础组件
      • Context
      • Activity
        • 生命周期
        • 启动模式与任务栈
        • 启动流程
      • Service
      • ContentProvider
      • BroadcastReceiver
      • Fragment
      • View
        • 常用控件问题总结
          • RecyclerView
          • ViewPager2
        • CoordinatorLayout
        • SurfaceView
        • 事件分发
        • 绘制流程
        • 自定义 View
        • Window
    • 数据存储
      • 存储结构
      • Sqlite
      • 序列化
      • SharedPreferences
    • 资源
      • 图片加载
    • 动画
      • 属性动画
    • 线程和进程
      • Binder 机制
      • 跨进程通信
        • AIDL
    • 内部原理
      • 消息循环机制
      • Binder
      • Window
      • SparseArray
      • ArrayMap
      • RecyclerView
      • App 启动流程
    • 性能优化
      • 内存
        • 内存使用优化
        • 内存泄漏
      • 启动优化
      • 缩减包大小
      • 布局优化
      • ANR
    • 打包构建
      • dex 文件
      • APK 打包流程
      • APK 签名流程
    • 架构
      • 运行时
      • Android 系统架构
      • 应用项目架构
    • 开源框架源码或原理
      • RxJava
        • 使用笔记
        • 源码解析
      • Retrofit
      • ButterKnife
      • BlockCanary
      • LeakCanary
      • OkHttp
      • 图片加载
        • Glide
        • Picasso
    • 碎片化处理
      • 屏幕适配
    • 黑科技
      • 热修复
    • Jetpack
      • Lifecycle
      • Room
      • WorkManager
    • 新动态
      • AndroidX
      • 各系统版本特性
  • 开发工具
    • 正则表达式
    • ADB
    • Git
  • Kotlin
  • Flutter
  • 关于作者
  • 致谢
由 GitBook 提供支持
在本页
  • OkHttpClient 创建
  • 请求创建
  • 异步请求
  • AsyncCall#run
  • RealCall#getResponseWithInterceptorChain
  • RealInterceptorChain#proceed
  • 同步请求
  • Dispatcher
  • 线程池创建
  • 请求发送
  • BridgeInterceptor
  • CallServerInterceptor
  • RealInterceptorChain#initExchange
  • ExchangeFinder#find
  • RealConnection#newCodec
  • 缓存处理
  • 相关链接

这有帮助吗?

  1. Android
  2. 开源框架源码或原理

OkHttp

Version 4.4.0

OkHttpClient 创建

可以通过建造者模式进行参数设定,会初始化调度器Dispatcher。

请求创建

请求的创建等都使用了建造者模式。

异步请求

请求真正的执行时RealCall类,同步请求使用execute,异步请求使用enqueue。

Realcall#enqueue 调用了 client.dispatcher#enqueue(AsyncCall)

// RealCall#enqueue
override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
//Dispatcher#enqueue
internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      //添加到等待队列
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }
  
  
  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      //将等待中的请求放入可执行队列
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }
   //将可执行请求放入线程池
    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

先加入到等待队列中,然后通过 promoteAndExecute 方法,将等待队列中可以执行的请求添加到可执行列表中(需要满足限制条件:并发请求数量、相同host并发请求数量、线程数等等),同时添加到执行中列表中,然后通过线程池执行。

AsyncCall#run

//AsyncCall#run
override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }

执行请求的方法 RealCall#getResponseWithInterceptorChain(),这个方法中会将用户自定义的拦截器加上已经按职责分离的所有必须拦截器构造一个RealInterceptorChain,然后调用它的proceed 方法处理请求。

RealCall#getResponseWithInterceptorChain

//RealCall#getResponseWithInterceptorChain()
internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    //添加用户自定义的拦截器
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    //添加用于发送请求的拦截器
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

请求处理使用了连接器链完成-责任链模式。

RealInterceptorChain#proceed

override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.connection.supportsUrl(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // Call the next interceptor in the chain.
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

intercept 方法参数为 RealInterceptorChain ,该方法中需要执行 chain.proceed 进行传递。

如果要在响应前执行操作,那么可以放在 chain.proceed 之前;如果要在收到响应后再处理,可以先调用 chain.proceed 获取响应,然后再操作.

请求完成后通过 dispatcher.finish方法,将执行完的call移除队列,并在符合条件的情况下执行等待队列中的请求并添加到运行中队列。

同步请求

// RealCall#execute()
override fun execute(): Response {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }

dispatcher 只是将 call 添加到了运行中队列:

@Synchronized internal fun executed(call: RealCall) {
    runningSyncCalls.add(call)
 }

Dispatcher

Dispatcher 有三个队列

//等待中的异步请求
private val readyAsyncCalls = ArrayDeque<AsyncCall>()

//处理中的异步请求
private val runningAsyncCalls = ArrayDeque<AsyncCall>()

//处理中的同步请求
private val runningSyncCalls = ArrayDeque<RealCall>()

线程池创建

val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

默认情况下创建的线程池,核心线程数是0(没有常驻线程)、存活时间60秒。默认拒绝策略是抛异常。

请求发送

BridgeInterceptor

主要是将用户创建的请求调整为HTTP 协议格式,并将原始响应组成成Response。

override fun intercept(chain: Interceptor.Chain): Response {
    val userRequest = chain.request()
    val requestBuilder = userRequest.newBuilder()

    val body = userRequest.body
    //------------------处理请求--------------
    if (body != null) {
      val contentType = body.contentType()
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString())
      }

      val contentLength = body.contentLength()
      if (contentLength != -1L) {
        requestBuilder.header("Content-Length", contentLength.toString())
        requestBuilder.removeHeader("Transfer-Encoding")
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked")
        requestBuilder.removeHeader("Content-Length")
      }
    }

    if (userRequest.header("Host") == null) {
      requestBuilder.header("Host", userRequest.url.toHostHeader())
    }

    if (userRequest.header("Connection") == null) {
      requestBuilder.header("Connection", "Keep-Alive")
    }

    // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
    // the transfer stream.
    var transparentGzip = false
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true
      requestBuilder.header("Accept-Encoding", "gzip")
    }

    val cookies = cookieJar.loadForRequest(userRequest.url)
    if (cookies.isNotEmpty()) {
      requestBuilder.header("Cookie", cookieHeader(cookies))
    }

    if (userRequest.header("User-Agent") == null) {
      requestBuilder.header("User-Agent", userAgent)
    }
    //-----------------处理请求 Done--------------------

    val networkResponse = chain.proceed(requestBuilder.build())
    
    //-----------------开始处理响应----------------------

    cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)

    val responseBuilder = networkResponse.newBuilder()
        .request(userRequest)

    if (transparentGzip &&
        "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
        networkResponse.promisesBody()) {
      val responseBody = networkResponse.body
      if (responseBody != null) {
        val gzipSource = GzipSource(responseBody.source())
        val strippedHeaders = networkResponse.headers.newBuilder()
            .removeAll("Content-Encoding")
            .removeAll("Content-Length")
            .build()
        responseBuilder.headers(strippedHeaders)
        val contentType = networkResponse.header("Content-Type")
        responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
      }
    }

    return responseBuilder.build()
  }

CallServerInterceptor

真正用于发送请求的拦截器,这也是拦截器列表里最后一个拦截器。

override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.exchange!!
    val request = realChain.request
    val requestBody = request.body
    val sentRequestMillis = System.currentTimeMillis()

    exchange.writeRequestHeaders(request)

    var invokeStartEvent = true
    var responseBuilder: Response.Builder? = null
    if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
        exchange.flushRequest()
        responseBuilder = exchange.readResponseHeaders(expectContinue = true)
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
      if (responseBuilder == null) {
        if (requestBody.isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest()
          val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
          requestBody.writeTo(bufferedRequestBody)
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
          requestBody.writeTo(bufferedRequestBody)
          bufferedRequestBody.close()
        }
      } else {
        exchange.noRequestBody()
        if (!exchange.connection.isMultiplexed) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection()
        }
      }
    } else {
      exchange.noRequestBody()
    }

    if (requestBody == null || !requestBody.isDuplex()) {
      exchange.finishRequest()
    }
    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
        invokeStartEvent = false
      }
    }
    var response = responseBuilder
        .request(request)
        .handshake(exchange.connection.handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build()
    var code = response.code
    if (code == 100) {
      // Server sent a 100-continue even though we did not request one. Try again to read the actual
      // response status.
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
      if (invokeStartEvent) {
        exchange.responseHeadersStart()
      }
      response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      code = response.code
    }

    exchange.responseHeadersEnd(response)

    response = if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response.newBuilder()
          .body(EMPTY_RESPONSE)
          .build()
    } else {
      response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build()
    }
    if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
        "close".equals(response.header("Connection"), ignoreCase = true)) {
      exchange.noNewExchangesOnConnection()
    }
    if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
      throw ProtocolException(
          "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
    }
    return response
  }

主要借助Exchange来发送请求和读取响应,而 Exchange 又借助 ExchangeCodec 编解码器来进行处理。ExchangeCodec 是一个接口,实现类有 Http1ExchangeCodec 和Http2ExchangeCodec,分别对应HTTP 1.1 和 HTTP 2。

连接的初始化在 ConnectInterceptor 中进行,调用了call.initExchange 方法

@Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {
    val realChain = chain as RealInterceptorChain
    val exchange = realChain.call.initExchange(chain)
    val connectedChain = realChain.copy(exchange = exchange)
    return connectedChain.proceed(realChain.request)
  }

RealInterceptorChain#initExchange

internal fun initExchange(chain: RealInterceptorChain): Exchange {
    synchronized(connectionPool) {
      check(!noMoreExchanges) { "released" }
      check(exchange == null)
    }

    val codec = exchangeFinder!!.find(client, chain)
    val result = Exchange(this, eventListener, exchangeFinder!!, codec)
    this.interceptorScopedExchange = result

    synchronized(connectionPool) {
      this.exchange = result
      this.exchangeRequestDone = false
      this.exchangeResponseDone = false
      return result
    }
  }

ExchangeFinder#find

fun find(
    client: OkHttpClient,
    chain: RealInterceptorChain
  ): ExchangeCodec {
    try {
      val resultConnection = findHealthyConnection(
          connectTimeout = chain.connectTimeoutMillis,
          readTimeout = chain.readTimeoutMillis,
          writeTimeout = chain.writeTimeoutMillis,
          pingIntervalMillis = client.pingIntervalMillis,
          connectionRetryEnabled = client.retryOnConnectionFailure,
          doExtensiveHealthChecks = chain.request.method != "GET"
      )
      return resultConnection.newCodec(client, chain)
    } catch (e: RouteException) {
      trackFailure(e.lastConnectException)
      throw e
    } catch (e: IOException) {
      trackFailure(e)
      throw RouteException(e)
    }
  }

RealConnection#newCodec

internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
    val socket = this.socket!!
    val source = this.source!!
    val sink = this.sink!!
    val http2Connection = this.http2Connection

    return if (http2Connection != null) {
      Http2ExchangeCodec(client, this, chain, http2Connection)
    } else {
      socket.soTimeout = chain.readTimeoutMillis()
      source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
      sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
      Http1ExchangeCodec(client, this, source, sink)
    }
  }

缓存处理

默认的缓存逻辑在 CacheInterceptor 中,使用DiskLruCache。

override fun intercept(chain: Interceptor.Chain): Response {
    val cacheCandidate = cache?.get(chain.request())

    val now = System.currentTimeMillis()

    val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    val networkRequest = strategy.networkRequest
    val cacheResponse = strategy.cacheResponse

    cache?.trackResponse(strategy)

    if (cacheCandidate != null && cacheResponse == null) {
      // The cache candidate wasn't applicable. Close it.
      cacheCandidate.body?.closeQuietly()
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(HTTP_GATEWAY_TIMEOUT)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse!!.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build()
    }

    var networkResponse: Response? = null
    try {
      networkResponse = chain.proceed(networkRequest)
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        cacheCandidate.body?.closeQuietly()
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse?.code == HTTP_NOT_MODIFIED) {
        val response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers, networkResponse.headers))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()

        networkResponse.body!!.close()

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache!!.trackConditionalCacheHit()
        cache.update(cacheResponse, response)
        return response
      } else {
        cacheResponse.body?.closeQuietly()
      }
    }

    val response = networkResponse!!.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build()

    if (cache != null) {
      if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        val cacheRequest = cache.put(response)
        return cacheWritingResponse(cacheRequest, response)
      }

      if (HttpMethod.invalidatesCache(networkRequest.method)) {
        try {
          cache.remove(networkRequest)
        } catch (_: IOException) {
          // The cache cannot be written.
        }
      }
    }

    return response
  }

主要逻辑:

  1. 校验缓存是否可用

  2. 网络不可用、缓存不可用,返回空响应

  3. 网络不可用、缓存可用,返回缓存的响应

  4. 网络可用、发送请求,如果304,返回缓存可用,

  5. 根据最新的响应更新或添加缓存

相关链接

上一页LeakCanary下一页图片加载

最后更新于5年前

这有帮助吗?

Github
彻底理解OkHttp - OkHttp 源码解析及OkHttp的设计思想
OkHttp必须弄清楚的几个原理性问题