文章

Kotlin协程相关总结

Kotlin协程相关总结

Kotlin 协程

协程是什么?kotlinx.coroutines 是什么?

协程 (Coroutines) 是一个新的概念,但是协程这个术语早在 1958 年就被提出并用于构建汇编程序,协程是一种编程思想,并不局限于特定的语言,就像 Rx 也是一种思想,并不局限于使用 Java 实现的 RxJava。不同语言实现的协程库可能名称或者使用上有所不同,但它们的设计思想是有相似之处的。
kotlinx.coroutines 是由 JetBrains 开发的 kotlin 协程库,可以把它简单的理解为一个线程框架 。
协程并不是从操作系统层面创立的新的运行方式,代码是运行在线程中的,线程又是运行在进程中的,协程也是运行在线程中的,所以才说它是基于线程封装的库。
Kotlin 协程解决了异步编程时过多回调的问题,用写同步代码的方式来写异步代码,简化了异步编程。

进程、线程和协程之间的关系

  1. 协程和线程提出的时间

协程是 1963 年正式提出,1966 年才有了线程的概念。

  1. 进程、线程和协程包含关系

同一时刻,同一个 CPU 的某个核心上,只有一个进程的一个线程的一个协程 (如果有) 在运行。
一个进程包含至少一个线程(主线程),一个线程里有 0 或多个协程,一个协程是以线程为宿主进行的计算活动。协程一旦确定宿主线程,一般不会再更改。
zv5e7

  1. 资源分配

进程是资源分配的基本单位,进程间的内存空间是隔离的,线程是 CPU 调度的基本单位,协程对于 OS 来说是透明的;协程被认为是用户态的线程,协程的调度由用户完成。进程向自己所属线程开放内存空间,线程有自己的堆栈、程序计数器和寄存器。

  1. 资源消耗

一个线程消耗的内存一般在 MB 级别,而协程占用内存一般在几十到几百 KB,Goroutine 经过层层优化后占用 2KB 内存。Java 为了解决多线程内存分配锁竞争的性能问题,每个线程还会在自己的内存空间中额外申请默认 64MB 内存作为堆内存(TLAB),使得操作系统的内存无法支撑几万个线程的并发,但是对协程来说却不是个问题。

  1. 上下文切换成本

线程切换需要到内核态,线程上下文切换的成本在几十纳秒到几微秒间,当线程繁忙且数量众多时,这些切换会消耗绝大部分的 CPU 运算能力。
7jdux

Kotlin 协程原理

什么是 suspend?suspend 原理?

什么是 suspend?

  • suspend 修饰函数时,表明这个函数可能会被挂起,挂起函数不一定会挂起协程,如果相关调用的结果已经可用,库可以决定继续进行而不挂起。
  • 挂起函数使用 CPS style 的代码来挂起协程,保证挂起点后面的代码只能在挂起函数执行完后才能执行,所以挂起函数保证了协程内的顺序执行顺序
  • 每个挂起函数的最后一个参数是一个 Continuation 类型

suspend 原理

suspend 函数反编译后都会由编译器增加一个 Continuation 类型参数在最后(Retrofit 就是根据这个参数来判断是普通函数还是 suspend 函数的)
启动一个协程一般需要传递一个 suspend ()->Tsuspend lambda,launch 最后一个参数 block
suspend lambda 又是个什么东西?
反编译后它其实就是个 SuspendLambda 并实现了 Function1 接口的类,而 SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
里面的 create() 方法和 invokeSuspend() 都是实现了 Continuation 接口的方法。

协程的原理

协程的创建

以 launch 为例,调用链:

  • CoroutineScope.launch
  • AbstractCoroutine.start
  • CoroutineStart.invoke
  • (suspend (R) -> T).startCoroutineCancellable 创建 Continuation
    • (suspend (R) -> T).createCoroutineUnintercepted

协程的启动

  • SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
  • 入口 resumeWith

协程的的启动入口是在 BaseContinuationImpl#resumeWith 里面有个死循环,里面调用了 invokeSuspend(),最终会调用到 suspend lambda 的 invokeSuspend,里面是个状态机,各种 case。

  • invokeSuspend(xxx) 最终调用的是你写的 suspend()->T 的代码,判断结果是否是 COROUTINE_SUSPEND,如果是那么就 return 了,while 循环退出,也就是挂起了;每次挂起都是执行一次 invokeSuspend 函数
  • 如果不是 COROUTINE_SUSPEND,最终会调用 resumeWith 进行协程的恢复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?): Continuation<Any?> {
    public final override fun resumeWith(result: Result<Any?>) {
        var current = this
        while (true) {
            with(current) {
				val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

协程的结构化并发

协程如何切换线程的?

launch 如何切换线程的?

  1. 线程上下文 ContinuationInterceptor 就是一个 CoroutineContext,在 launch 时可以传递进去
  2. 有个 intercepted() 进行分发
  • CoroutineScope.launch()
  • AbstractCoroutine.start(start, coroutine, block)
  • (suspend (R) -> T).startCoroutineCancellable SuspendLambda
    • SuspendLambda.createCoroutineUnintercepted()
    • Continuation.intercepted() 这里会传递 CoroutineDispatcher(通过 CoroutineContext 获取) 和 Continuation 给 DispatchedContinuation 构造
    • DispatchedContinuation.interceptContinuation()
    • DispatchedContinuation.resumeCancellableWith() 如果需要就调用 Dispatcher.dispatch() 到指定线程运行任务
  • main 线程是通过 HandlerContext,最终通过 Handler 来实现的

withContext 是怎样切换线程的?

withContext 其实就是一层 Api 封装,最后调用到了 startCoroutineCancellable,这就跟 launch 后面的流程一样了

  • suspendCoroutineUninterceptedOrReturn(CoroutineContext, SuspendLambda)
  • DispatchedCoroutine.afterResume
  • Continuation.intercepted() 后面流程同 launch 了

协程异常如何传递的?

协程异常

  • 协程中抛出 CancellationException 异常会被忽略掉
  • 抛出未捕获的非 CancellationException 异常会取消子协程和自己,也会取消父协程,一直取消 root 协程,异常也会由 root 协程处理
  • 如果使用了 SupervisorJob 或 supervisorScope,子协程抛出未捕获的非 CancellationException 异常不会取消父协程,异常也会由子协程自己处理

如何实现子协程异常了,父协程 cancel 的?
通过 Job 链,
SupervisorJob 作用和原理
作用:默认情况下,子协程发生异常后,会取消父协程、兄弟协程的执行;SupervisorJob 中子协程发生异常,不会取消父协程和兄弟协程。
原理:当需要取消父 Job 时,势必会调用到:job.childCancelled(cause) 而 SupervisorJob 重写了该函数:直接返回 false

协程并发访问?

Flow

热流和冷流

Flow 是冷流,ChannelFlow 是热流

  • 热流:无论有没有 Collector(RxJava 中是 Subscriber) 订阅,事件始终都会发生;多个订阅者是一对多的关系,多个订阅者共享信息
  • 冷流:只有 Collector 订阅时,才开始执行发射数据流的代码;Flow 和 Collector 是一对一关系,多个不同的订阅者,消息是重新完整发送的

协程面试题

Retrofit 使用协程,需要切换线程吗?

Retrofit 使用协程时,不需要 withContext 来切换线程了,因为用的是 OkHttp 的 enqueue 异步方法,Retrofit 只是包装成一个 suspend 方法,具体看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
suspend fun <T : Any> Call<T>.awaitResponse(): Response<T> {
    return suspendCancellableCoroutine { continuation ->
        continuation.invokeOnCancellation {
            cancel()
        }
        enqueue(object : Callback<T> {
            override fun onResponse(call: Call<T>, response: Response<T>) {
                continuation.resume(response)
            }
            
            override fun onFailure(call: Call<T>, t: Throwable) {
                continuation.resumeWithException(t)
            }
        })
    }
}

将 Call#enqueue 封装成 suspend 方法,调用的是异步的 enqueue,也就是说在协程中用 Retrofit+suspend 是不需要切换线程的。

聊聊 Job 和 SupervisorJob 的区别?

Job 和 SupervisorJob 的区别

  • Job 的子协程发生异常被取消会同时取消 Job 的其它子协程,而 SupervisorJob 不会。

Job 启动了 3 个子协程 job1、job2、job3。job1 delay 100 毫秒后发生异常,协程被取消了,job2 和 job3 也同样被取消了;SupervisorJob 启动了 3 个子协程 job1、job2、job3。job1 delay 100 毫秒后发生异常,协程被取消了,job2 和 job3 并不受影响。

Job 和 SupervisorJob 原理

1
2
3
4
5
6
7
8
9
10
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)

internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
    // ...
}
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
    override fun childCancelled(cause: Throwable): Boolean = false
}

Job() 返回的是 JobImpl 对象,SupervisorJob() 返回的 SupervisorJobImpl 对象。而 SupervisorJobImpl 是 JobImpl 的子类,并且重写了 childCancelled 方法,返回值为 false。JobImpl 继承自 JobSupport,它的 childCancelled 方法源码如下:

1
2
3
4
5
6
public open class JobSupport constructor(active: Boolean) {
    public open fun childCancelled(cause: Throwable): Boolean {
        if (cause is CancellationException) return true
        return cancelImpl(cause) && handlesException
    }
}

Ref

协程简史,一文讲清楚协程的起源、发展和实现

本文由作者按照 CC BY 4.0 进行授权