Kotlin协程相关总结
Kotlin 协程
协程是什么?kotlinx.coroutines 是什么?
协程 (Coroutines) 是一个新的概念,但是协程这个术语早在 1958 年就被提出并用于构建汇编程序,协程是一种编程思想,并不局限于特定的语言,就像 Rx 也是一种思想,并不局限于使用 Java 实现的 RxJava。不同语言实现的协程库可能名称或者使用上有所不同,但它们的设计思想是有相似之处的。
kotlinx.coroutines 是由 JetBrains 开发的 kotlin 协程库,可以把它简单的理解为一个线程框架 。
协程并不是从操作系统层面创立的新的运行方式,代码是运行在线程中的,线程又是运行在进程中的,协程也是运行在线程中的,所以才说它是基于线程封装的库。
Kotlin 协程解决了异步编程时过多回调的问题,用写同步代码的方式来写异步代码,简化了异步编程。
进程、线程和协程之间的关系
- 协程和线程提出的时间
协程是 1963 年正式提出,1966 年才有了线程的概念。
- 进程、线程和协程包含关系
同一时刻,同一个 CPU 的某个核心上,只有一个进程的一个线程的一个协程 (如果有) 在运行。
一个进程包含至少一个线程(主线程),一个线程里有 0 或多个协程,一个协程是以线程为宿主进行的计算活动。协程一旦确定宿主线程,一般不会再更改。
- 资源分配
进程是资源分配的基本单位,进程间的内存空间是隔离的,线程是 CPU 调度的基本单位,协程对于 OS 来说是透明的;协程被认为是用户态的线程,协程的调度由用户完成。进程向自己所属线程开放内存空间,线程有自己的堆栈、程序计数器和寄存器。
- 资源消耗
一个线程消耗的内存一般在 MB 级别,而协程占用内存一般在几十到几百 KB,Goroutine 经过层层优化后占用 2KB 内存。Java 为了解决多线程内存分配锁竞争的性能问题,每个线程还会在自己的内存空间中额外申请默认 64MB 内存作为堆内存(TLAB),使得操作系统的内存无法支撑几万个线程的并发,但是对协程来说却不是个问题。
- 上下文切换成本
线程切换需要到内核态,线程上下文切换的成本在几十纳秒到几微秒间,当线程繁忙且数量众多时,这些切换会消耗绝大部分的 CPU 运算能力。
Kotlin 协程原理
什么是 suspend?suspend 原理?
什么是 suspend?
- suspend 修饰函数时,表明这个函数可能会被挂起,挂起函数不一定会挂起协程,如果相关调用的结果已经可用,库可以决定继续进行而不挂起。
- 挂起函数使用 CPS style 的代码来挂起协程,保证挂起点后面的代码只能在挂起函数执行完后才能执行,所以挂起函数保证了协程内的顺序执行顺序
- 每个挂起函数的最后一个参数是一个 Continuation 类型
suspend 原理
suspend 函数反编译后都会由编译器增加一个 Continuation 类型参数在最后(Retrofit 就是根据这个参数来判断是普通函数还是 suspend 函数的)
启动一个协程一般需要传递一个 suspend ()->T
suspend lambda,launch 最后一个参数 block
suspend lambda 又是个什么东西?
反编译后它其实就是个 SuspendLambda
并实现了 Function1
接口的类,而 SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
里面的 create()
方法和 invokeSuspend()
都是实现了 Continuation 接口的方法。
协程的原理
协程的创建
以 launch 为例,调用链:
- CoroutineScope.launch
- AbstractCoroutine.start
- CoroutineStart.invoke
- (suspend (R) -> T).startCoroutineCancellable 创建 Continuation
- (suspend (R) -> T).createCoroutineUnintercepted
协程的启动
- SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
- 入口 resumeWith
协程的的启动入口是在
BaseContinuationImpl#resumeWith
里面有个死循环,里面调用了 invokeSuspend(),最终会调用到 suspend lambda 的 invokeSuspend,里面是个状态机,各种 case。
- invokeSuspend(xxx) 最终调用的是你写的
suspend()->T
的代码,判断结果是否是COROUTINE_SUSPEND
,如果是那么就 return 了,while 循环退出,也就是挂起了;每次挂起都是执行一次 invokeSuspend 函数 - 如果不是
COROUTINE_SUSPEND
,最终会调用 resumeWith 进行协程的恢复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?): Continuation<Any?> {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
}
协程的结构化并发
协程如何切换线程的?
launch 如何切换线程的?
- 线程上下文 ContinuationInterceptor 就是一个 CoroutineContext,在 launch 时可以传递进去
- 有个 intercepted() 进行分发
- CoroutineScope.launch()
- AbstractCoroutine.start(start, coroutine, block)
- (suspend (R) -> T).startCoroutineCancellable SuspendLambda
- SuspendLambda.createCoroutineUnintercepted()
- Continuation.intercepted() 这里会传递 CoroutineDispatcher(通过 CoroutineContext 获取) 和 Continuation 给 DispatchedContinuation 构造
- DispatchedContinuation.interceptContinuation()
- DispatchedContinuation.resumeCancellableWith() 如果需要就调用 Dispatcher.dispatch() 到指定线程运行任务
- main 线程是通过 HandlerContext,最终通过 Handler 来实现的
withContext 是怎样切换线程的?
withContext 其实就是一层 Api 封装,最后调用到了 startCoroutineCancellable
,这就跟 launch 后面的流程一样了
- suspendCoroutineUninterceptedOrReturn(CoroutineContext, SuspendLambda)
- DispatchedCoroutine.afterResume
- Continuation.intercepted() 后面流程同 launch 了
协程异常如何传递的?
协程异常
- 协程中抛出 CancellationException 异常会被忽略掉
- 抛出未捕获的非 CancellationException 异常会取消子协程和自己,也会取消父协程,一直取消 root 协程,异常也会由 root 协程处理
- 如果使用了 SupervisorJob 或 supervisorScope,子协程抛出未捕获的非 CancellationException 异常不会取消父协程,异常也会由子协程自己处理
如何实现子协程异常了,父协程 cancel 的?
通过 Job 链,
SupervisorJob 作用和原理
作用:默认情况下,子协程发生异常后,会取消父协程、兄弟协程的执行;SupervisorJob 中子协程发生异常,不会取消父协程和兄弟协程。
原理:当需要取消父 Job 时,势必会调用到:job.childCancelled(cause) 而 SupervisorJob 重写了该函数:直接返回 false
协程并发访问?
Flow
热流和冷流
Flow 是冷流,ChannelFlow 是热流
- 热流:无论有没有 Collector(RxJava 中是 Subscriber) 订阅,事件始终都会发生;多个订阅者是一对多的关系,多个订阅者共享信息
- 冷流:只有 Collector 订阅时,才开始执行发射数据流的代码;Flow 和 Collector 是一对一关系,多个不同的订阅者,消息是重新完整发送的
协程面试题
Retrofit 使用协程,需要切换线程吗?
Retrofit 使用协程时,不需要 withContext 来切换线程了,因为用的是 OkHttp 的 enqueue 异步方法,Retrofit 只是包装成一个 suspend 方法,具体看源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
suspend fun <T : Any> Call<T>.awaitResponse(): Response<T> {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
continuation.resume(response)
}
override fun onFailure(call: Call<T>, t: Throwable) {
continuation.resumeWithException(t)
}
})
}
}
将 Call#enqueue 封装成 suspend 方法,调用的是异步的 enqueue,也就是说在协程中用 Retrofit+suspend 是不需要切换线程的。
聊聊 Job 和 SupervisorJob 的区别?
Job 和 SupervisorJob 的区别
- Job 的子协程发生异常被取消会同时取消 Job 的其它子协程,而 SupervisorJob 不会。
Job 启动了 3 个子协程 job1、job2、job3。job1 delay 100 毫秒后发生异常,协程被取消了,job2 和 job3 也同样被取消了;SupervisorJob 启动了 3 个子协程 job1、job2、job3。job1 delay 100 毫秒后发生异常,协程被取消了,job2 和 job3 并不受影响。
Job 和 SupervisorJob 原理
1
2
3
4
5
6
7
8
9
10
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)
public fun SupervisorJob(parent: Job? = null) : CompletableJob = SupervisorJobImpl(parent)
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
// ...
}
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
}
Job() 返回的是 JobImpl 对象,SupervisorJob() 返回的 SupervisorJobImpl 对象。而 SupervisorJobImpl 是 JobImpl 的子类,并且重写了 childCancelled 方法,返回值为 false。JobImpl 继承自 JobSupport,它的 childCancelled 方法源码如下:
1
2
3
4
5
6
public open class JobSupport constructor(active: Boolean) {
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
}