Kotlin协程原理
协程原理
suspend 原理
suspend 方法
示例代码:
1
2
3
4
suspend fun susFun() {
delay(100)
println("hello suspend function.")
}
反编译后:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static final Object susFun(Continuation<? super kotlin.Unit> r7) {
/*
r4 = -2147483648(0xffffffff80000000, float:-0.0)
boolean r2 = r7 instanceof me.hacket.coroutine.SuspendFunTestKt.susFun.1 // 内部类SuspendFunTestKt$susFun$1
if (r2 == 0) goto L_0x0026
r2 = r7
me.hacket.coroutine.SuspendFunTestKt$susFun$1 r2 = (me.hacket.coroutine.SuspendFunTestKt.susFun.1) r2
int r3 = r2.label
r3 = r3 & r4
if (r3 == 0) goto L_0x0026
int r3 = r2.label
int r3 = r3 - r4
r2.label = r3
L_0x0013:
java.lang.Object r1 = r2.result
java.lang.Object r3 = kotlin.coroutines.intrinsics.IntrinsicsKt.getCOROUTINE_SUSPENDED()
int r4 = r2.label
switch(r4) {
case 0: goto L_0x002d;
case 1: goto L_0x003d;
default: goto L_0x001e;
}
L_0x001e:
java.lang.IllegalStateException r2 = new java.lang.IllegalStateException
java.lang.String r3 = "call to 'resume' before 'invoke' with coroutine"
r2.<init>(r3)
throw r2
L_0x0026:
me.hacket.coroutine.SuspendFunTestKt$susFun$1 r0 = new me.hacket.coroutine.SuspendFunTestKt$susFun$1
r0.<init>(r7)
r2 = r0
goto L_0x0013
L_0x002d:
kotlin.ResultKt.throwOnFailure(r1)
r4 = 100
r6 = 1
r2.label = r6
java.lang.Object r2 = kotlinx.coroutines.DelayKt.delay(r4, r2)
if (r2 != r3) goto L_0x0040
r2 = r3
L_0x003c:
return r2
L_0x003d:
kotlin.ResultKt.throwOnFailure(r1)
L_0x0040:
java.lang.String r2 = "hello suspend function."
java.io.PrintStream r3 = java.lang.System.out
r3.println(r2)
kotlin.Unit r2 = kotlin.Unit.INSTANCE
goto L_0x003c
switch-data {0->0x002d, 1->0x003d, }
*/
}
- 就是 CPS 代码,一堆 switch-case 处理不同的状态
- susFun 方法内部类
SuspendFunTestKt$susFun$1
是一个 ContinuationImpl:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final class SuspendFunTestKt$susFun$1 extends ContinuationImpl {
int label;
/* synthetic */ Object result;
SuspendFunTestKt$susFun$1(Continuation continuation) {
super(continuation);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return SuspendFunTestKt.susFun(this);
}
}
suspend lambda 是个什么东西?
示例代码:
1
2
3
4
5
6
7
8
9
fun main() {
val mySuspend1: suspend () -> String = {
delay(1000)
"hehe"
}
val javaClass = mySuspend1.javaClass
println("javaClass=${javaClass.simpleName}") // main$mySuspend1$1
println("javaClass.superclass=${javaClass.superclass.simpleName}") // SuspendLambda
}
我们用 jadx
反编译看看:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public final class SuspendTestKt {
public static final void main() {
Class javaClass = new main.mySuspend1.1((Continuation) null).getClass();
System.out.println((Object) ("javaClass=" + javaClass.getSimpleName()));
StringBuilder append = new StringBuilder().append("javaClass.superclass=");
Class<? super Object> superclass = javaClass.getSuperclass();
Intrinsics.checkExpressionValueIsNotNull(superclass, "javaClass.superclass");
System.out.println((Object) append.append(superclass.getSimpleName()).toString());
}
}
// 内部类SuspendTestKt$main$mySuspend1$1
final class SuspendTestKt$main$mySuspend1$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
int label;
SuspendTestKt$main$mySuspend1$1(Continuation continuation) {
super(1, continuation);
}
@NotNull
public final Continuation<Unit> create(@NotNull Continuation<?> continuation) {
Intrinsics.checkParameterIsNotNull(continuation, "completion");
return new SuspendTestKt$main$mySuspend1$1(continuation);
}
public final Object invoke(Object obj) {
return create((Continuation) obj).invokeSuspend(Unit.INSTANCE);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000, this) == coroutine_suspended) {
return coroutine_suspended;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "hehe";
}
}
由反编译可知,可以看到 suspend () -> String
是一个 SuspendLambda 并实现了 Function1 接口。
协程原理
以下面代码为例,分析协程执行原理:
1
2
3
4
5
6
7
fun main() {
val scope = MyContextScope(EmptyCoroutineContext)
scope.launch(Dispatchers.IO) {
println("hello world. ${Thread.currentThread().name}")
}
Thread.sleep(2000)
}
输出:
1
hello world. DefaultDispatcher-worker-1
反编译生成代码
反编译生成的代码,launch 的第三个参数 block 会生成一个 SuspendLambda 内部类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final class HahaKt {
public static final void main() {
BuildersKt.launch$default((CoroutineScope) new MyContextScope(EmptyCoroutineContext.INSTANCE), Dispatchers.getIO(), (CoroutineStart) null, new main.1((Continuation) null), 2, (Object) null);
Thread.sleep(2000);
}
}
// launch的第三个参数block会生成内部类HahaKt$main$1
final class HahaKt$main$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
int label;
private CoroutineScope p$;
HahaKt$main$1(Continuation continuation) {
super(2, continuation);
}
@NotNull
public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> continuation) {
Intrinsics.checkParameterIsNotNull(continuation, "completion");
HahaKt$main$1 hahaKt$main$1 = new HahaKt$main$1(continuation);
CoroutineScope coroutineScope = (CoroutineScope) value;
hahaKt$main$1.p$ = (CoroutineScope) value;
return hahaKt$main$1;
}
public final Object invoke(Object obj, Object obj2) {
return create(obj, (Continuation) obj2).invokeSuspend(Unit.INSTANCE);
}
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineScope coroutineScope = this.p$;
StringBuilder append = new StringBuilder().append("hello world. ");
Thread currentThread = Thread.currentThread();
Intrinsics.checkExpressionValueIsNotNull(currentThread, "Thread.currentThread()");
System.out.println((Object) append.append(currentThread.getName()).toString());
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
}
协程的创建
CoroutineScope.launch
首先看 launch:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// CoroutineScope
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// 根据父级创建新的上下文(协程的父级上下文)
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
// 协程真正的上下文生成是以newContext作为父级上下文生成的
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
// 这是一个CoroutineScope的扩展函数,coroutineContext其实就是拿到到了scope对象的成员
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
- newCoroutineContext 创建一个新的协程上下文,作为生成 XXXCoroutine 的 parentContext
- 根据 CoroutineStart 是否 isLazy,协程是否马上执行
- coroutine.start 执行协程
AbstractCoroutine.start
现在看看 AbstractCoroutine.start:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AbstractCoroutine.kt
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
init {
if (initParentJob) initParentJob(parentContext[Job])
}
// receiver: StandaloneCoroutine
// block: suspend StandaloneCoroutine.() -> Unit
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
// initParentJob()
start(block, receiver, this) // 等同于start.invoke() ,注意第3个参数completion:this
}
}
调用了 CoroutineStart.invoke() 方法,看看它的参数:
- 参数 1:block,要执行的协程体:
suspend StandaloneCoroutine.() -> Unit
,、其实就是一个 SuspendLambda - 参数 1:receiver,为一个 AbstractCoroutine,这里为
StandaloneCoroutine
- 参数 3:completion,就是 this,即 StandaloneCoroutine:,这个参数很重要,后面要用到
看看 StandaloneCoroutine:
1
2
3
4
5
6
7
8
9
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
CoroutineStart.invoke
调用到了 CoroutineStart,CoroutineStart 是一个枚举类,接着看 CoroutineStart#invoke()
方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// CoroutineStart
public enum class CoroutineStart {
DEFAULT, LAZY, ATOMIC, UNDISPATCHED;
public val isLazy: Boolean get() = this === LAZY
// block - suspend StandaloneCoroutine.() -> Unit,为SuspendLambda
// receiver - StandaloneCoroutine
// completion - StandaloneCoroutine<Unit>
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
// 根据 start 参数的类型调用不同的方法
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
}
然后调用 startCoroutineCancellable()
(suspend (R) -> T).startCoroutineCancellable 创建 Continuation
这里我们看 CoroutineStart.DEFAULT
,然后调用了 block.startCoroutineCancellable(receiver, completion)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Cancellable.kt
// receiver - StandaloneCoroutine
// completion - StandaloneCoroutine<Unit>
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) { // 抛出异常后,调用Continuation.resumeWith(Result.failure(e))
createCoroutineUnintercepted(receiver, completion)
.intercepted()
.resumeCancellableWith(Result.success(Unit), onCancellation)
}
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
try {
block()
} catch (e: Throwable) {
completion.resumeWith(Result.failure(e))
}
}
(suspend (R) -> T).createCoroutineUnintercepted
(suspend (R) -> T).createCoroutineUnintercepted 创建一个 Continuation
现在看看 createCoroutineUnintercepted():
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Cancellable.kt
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>):Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/jvm/src/kotlin/coroutines/jvm/internal/DebugProbes.kt
internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
/** implementation of this function is replaced by debugger */
return completion
}
- 通过前面的分析可知道
suspend()->T
是一个 SuspendLambda,SuspendLambda 间接继承了 BaseContinuationImpl,上面会走 create() 方法,前面反编译可知 create 方法会创建一个 SuspendLambda,参数为 completion
其继承关系为:SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
- createCoroutineFromSuspendFunction 用来
suspending lambda
没有继承 BaseContinuationImpl,具体源码看 IntrinsicsJvm
接着看 create(completion),create 方法创建的 Continuation 是一个 SuspendLambda 对象。
看看 create 反编译后的代码:
1
2
3
4
5
6
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
接着看 intercepted()
Continuation.intercepted() 返回 DispatchedContinuation
接着回到 startCoroutineCancellable 看 intercepted()
,通过 ContinuationInterceptor
拦截当前 Continuation
1
2
3
4
5
// Cancellable.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
// 如果是ContinuationImpl类型,则调用intercepted方法,否则返回自身
// 这里的 this 是 Main$main$1 实例 - ContinuationImpl的子类
(this as? ContinuationImpl)?.intercepted() ?: this
接着看 ContinuationImpl.intercepted()
:
1
2
3
4
5
6
7
8
9
10
11
// ContinuationImpl
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
// context[ContinuationInterceptor]是 CoroutineDispatcher 实例
intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
直接返回 intercepted;如果 intercepted 为 null,取 CoroutineContext 中的 ContinuationInterceptor
,并调用其 interceptContinuation()
而 CoroutineDispatcher
实现了 ContinuationInterceptor
1
2
3
// CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
所以 intercepted() 分情况:
- 需要线程调度 - 返回 DispatchedContinuation,其 continuation 参数值为 SuspendLambda
- 不需要线程调度 - 返回 SuspendLambda
协程的启动
接下来看看 resumeCancellableWith
是怎么启动协程的,这里还涉及到 Dispatchers 线程调度的逻辑:
DispatchedContinuation
前面 startCoroutineCancellable()
里,如果有线程调度,那么返回的是 DispatchedContinuation
;没有的话返回 SuspendLambda
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// DispatchedContinuation
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
override val delegate: Continuation<T>
get() = this
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) { // 不需要调度则直接在当前线程执行协程
resumeUndispatchedWith(result)
}
}
}
}
inline fun resumeUndispatchedWith(result: Result<T>) {
withContinuationContext(continuation, countOrElement) {
continuation.resumeWith(result)
}
}
}
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
// 进行线程调度,最后也会执行到continuation.resumeWith方法
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
// 直接执行continuation.resumeWith方法
else -> resumeWith(result)
}
- 当需要线程调度时,则在调度后会调用 DispatchedContinuation.continuation.resumeWith 来启动协程,其中 continuation 是 SuspendLambda 实例
- 当不需要线程调度时,则直接调用 SuspendLambda.resumeWith 来启动协程
DispatchedContinuation 继承自 DispatchedTask,又继承自 Task,最终实现了 Runnable,那我们看下其 run 方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
continuation.resumeWithException(exception)
} else {
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
- continuation 就是 DispatchedContinuation 构造器中的 continuation,就是 SuspendLambda
- 封装了 Continuation 的
resumeWithException
和resume
操作逻辑,最终调用的是 SuspendLambda 的 resume 方法
下面看看 SuspendLambda
的类关系:SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
resumeWith 方法调用的是父类 BaseContinuationImpl 中的 resumeWith 方法:
1
2
3
4
5
6
7
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
// ...
val outcome = invokeSuspend(param)
// ...
}
}
SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation
SuspendLambda
由前面可知 suspend () -> T
是一个 SuspendLambda,现在看看 SuspendLambda
:
1
2
3
4
5
6
7
8
9
10
11
12
13
// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
}
ContinuationImpl
SuspendLambda 继承 ContinuationImpl,接着看看 ContinuationImpl
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> = // 拦截Continuation
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
}
BaseContinuationImpl
ContinuationImpl 又继承 BaseContinuationImpl,接着看 BaseContinuationImpl:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) :
// 这个completion就是AbstractCoroutine
Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) { // 死循环
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// SuspendLambda是BaseContinuationImpl
// unrolling recursion via loop
current = completion
param = outcome
} else { // AbstractCoroutine不是BaseContinuationImpl
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
protected open fun releaseIntercepted() {
// does nothing here, overridden in ContinuationImpl
}
// 子类实现,返回一个Continuation
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
// 子类实现,返回一个Continuation
public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
}
public override fun toString(): String =
"Continuation at ${getStackTraceElement() ?: this::class.java.name}"
// --- CoroutineStackFrame implementation
public override val callerFrame: CoroutineStackFrame?
get() = completion as? CoroutineStackFrame
public override fun getStackTraceElement(): StackTraceElement? =
getStackTraceElementImpl()
}
- create 方法需由子类实现,返回一个 Continuation(类似
SuspendTestKt$main$mySuspend1$1
继承自 SuspendLambda) - resumeWith 执行的入口
- invokeSuspend 真正的代码逻辑,即你自己写的协程体的代码
最后协程是调用了 AbstractCoroutine
的 resumeWith
1
2
3
4
5
6
// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
协程启动小结
- 协程的启动是通过 BaseContinuationImpl.resumeWith 方法调用到了子类 SuspendLambda.invokeSuspend 方法,然后通过状态机来控制顺序运行
- 在 BaseContinuationImpl.resumeWith 有个死循环,调用
invokeSuspend
来执行具体的协程代码,碰到COROUTINE_SUSPENDED
时, - Kotlin 中的协程存在着三层包装
1
2
3
第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;
第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;
第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。
协程状态机
以下面的代码为例解析一下协程启动的状态机流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private suspend fun getId(): String {
return GlobalScope.async(Dispatchers.IO) {
delay(1000)
"hearing"
}.await()
}
private suspend fun getAvatar(id: String): String {
return GlobalScope.async(Dispatchers.IO) {
delay(1000)
"avatar-$id"
}.await()
}
fun main() {
GlobalScope.launch {
val id = getId()
val avatar = getAvatar(id)
println("${Thread.currentThread().name} - $id - $avatar")
}
}
上面 main 方法中,GlobalScope.launch 启动的协程体在执行到 getId 后,协程体会挂起,直到 getId 返回可用结果,才会 resume launch 协程,执行到 getAvatar 也是同样的过程。
协程内部实现使用状态机来处理不同的挂起点,将 GlobalScope.launch 协程体字节码反编译成 Java 代码,大致如下 (有所删减):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
private static final Object getId(Continuation $completion) {
return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000L, this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "hearing";
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null).await($completion);
}
private static final Object getAvatar(final String id, Continuation $completion) {
return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000L, this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "avatar-" + id;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 2, (Object)null).await($completion);
}
public static final void main() {
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,
(CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
public final Object invokeSuspend(@NotNull Object $result) {
Object var10000;
String id;
label17: {
CoroutineScope $this$launch;
switch(this.label) {
case 0: // a
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
this.label = 1; // label置为1
var10000 = getId(this);
if (var10000 == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
// 若此时已经有结果,则不挂起,直接break
break;
case 1: // b
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2: // d
id = (String)this.L$1;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label17; // 退出label17
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
// c
id = (String)var10000;
this.L$1 = id; // 将id赋给L$1
this.label = 2; // label置为2
var10000 = getAvatar(id, this);
if (var10000 == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
}
// e
String avatar = (String)var10000;
String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();
System.out.println(var5);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.p$ = (CoroutineScope)value;
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}
}
invokeSuspend 方法会在协程体中的 suspend 函数得到结果后被调用;具体调用在
BaseContinuationImpl.resumeWith
调用
执行流程:
- a: launch 协程体刚执行到 getId 方法时,getId 方法的返回值将是
COROUTINE_SUSPENDED
, 此时直接 return, 则 launch 协程体中 getId 后面的代码暂时不会执行,即 launch 协程体被挂起 (非阻塞, 该线程依旧会做其它工作)。这里将 label 置为了 1. 而若此时 getId 已经有结果 (内部没有调用 delay 之类的 suspend 函数等),则不挂起,而是直接 break。 - b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 则当 getId 有可用结果返回后,会重新执行 launch 协程体的 invokeSuspend 方法,根据上面的
label==1
, 会执行到这里检查一下 result 没问题的话就 break, 此时 id 赋值给了 var10000。 - c: 在 a 中若直接 break 或 在 b 中得到 getId 的结果然后 break 后,都会执行到这里,得到 id 的值并把 label 置为 2。然后调用 getAvatar 方法,跟 getId 类似,若其返回 COROUTINE_SUSPENDED 则 return,协程被挂起,等到下次 invokeSuspend 被执行,否则离开 label17 接着执行后续逻辑。
- d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 则当 getAvatar 有可用结果返回后会重新调用 launch 协程体的 invokeSuspend 方法,此时根据
label==2
来到这里并取得之前的 id 值,检验 result(即 avatar),然后 break label17。 - e: c 中直接返回了可用结果 或 d 中 break label17 后,launch 协程体中的 suspend 函数都执行完毕了,这里会执行剩下的逻辑。
协程的挂起和恢复
Kotlin 编译器会为 协程体 生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在其 invokeSuspend 方法中。
Kotlin 协程的内部实现使用了 Kotlin 编译器的一些编译技术,当 suspend 函数被调用时,都有一个隐式的参数额外传入,这个参数是 Continuation
类型,封装了协程 resume 后执行的代码逻辑。
1
2
3
4
5
6
7
8
9
10
11
private suspend fun getId(): String {
return GlobalScope.async(Dispatchers.IO) {
delay(1000)
"hearing"
}.await()
}
// Decompile成Java
final Object getId(@NotNull Continuation $completion) {
// ...
}
其中传入的 $completion 参数,可以看到是调用 getId 方法所在的协程体对象,也就是一个 SuspendLambda 对象。Continuation 的定义如下:
1
2
3
4
5
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
将 getId 方法编译后的字节码反编译成 Java 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final Object getId(@NotNull Continuation $completion) {
// 新建与启动协程
return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "hearing";
}
// ...
}), 2, (Object)null).await($completion); // 调用 await() suspend 函数
}
- 在 getId,delay 未返回值时,返回 COROUTINE_SUSPENDED,即代表还没有值,此时协程挂起,但不阻塞线程;
- 当 suspend 函数有返回值时,会继续调用 invokeSuspend,恢复协程运行
父子协程
launch:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
// AbstractCoroutine
init {
if (initParentJob) initParentJob(parentContext[Job])
}
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
接着看 initParentJob()
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// JobSupport
protected fun initParentJob(parent: Job?) { // parent就是父协程的CoroutineContext协程上下文
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
接下来重点在于 parent.attachChild
方法:
1
2
3
public final override fun attachChild(child: ChildJob): ChildHandle {
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}
invokeOnCompletion 方法主要是将 handler 节点添加到父协程的一个队列 (state.list) 中。
GlobalScope.launch 没有父协程
协程完成
协程的完成通过 AbstractCoroutine.resumeWith
实现
1
2
3
4
5
6
// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
调用路径:makeCompletingOnce -> tryMakeCompleting -> tryMakeCompletingSlowPath -> tryWaitForChild
:
1
2
3
4
5
6
7
8
9
private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
val nextChild = child.nextChild() ?: return false
return tryWaitForChild(state, nextChild, proposedUpdate)
}
可知 tryWaitForChild 方法将 ChildCompletion 节点添加到了子协程的 state.list 队列中,当子协程完成或者取消时调用 ChildCompletion.invoke:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ChildCompletion
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
// figure out if we need to wait for next child
val waitChild = lastChild.nextChild()
// try wait for next child
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
// no more children to wait -- try update state
val finalState = finalizeFinishingState(state, proposedUpdate)
afterCompletion(finalState)
}
父协程需要等待所有子协程处于完成或者取消状态才能完成自身。
协程取消
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// JobSupport.kt
// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}
public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}
internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns state it means it had make it
// completing and had recorded exception
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}
在 makeCancelling()
调用了 notifyCancelling()
1
2
3
4
5
6
7
8
9
10
// JobSupport.kt
// list是一个协程启动时,initParentJob()将自己添加到了父Job的list,封装成了ChildHandleNode添加到父Job的list
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
// 会循环执行上面添加的 ChildHandleNode 的 invoke 方法,即循环取消子协程
notifyHandlers<JobCancellingNode>(list, cause)
// then cancel parent // 可能取消父协程
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
下面看看父 parent 如何取消 child,notifyHandlers<JobCancellingNode>(list, cause)
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
internal abstract class JobCancellingNode : JobNode()
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) // parent取消child
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) // job取消parent
}
// 子协程通过该方法取消自己
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
下面看看 child 如何取消 parent,cancelParent()
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private fun cancelParent(cause: Throwable): Boolean {
// isScopedCoroutine 为 true 则不传播且不取消父协程直接返回,默认为false,子类可以重写
// Is scoped coroutine -- don't propagate, will be rethrown
if (isScopedCoroutine) return true
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}
1
2
3
4
5
6
7
8
private class SupervisorCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
// supervisorScope 启动的协程调用 cancel 和传递异常时,只能由父协程向子协程传播,
// 不会取消父协程
override fun childCancelled(cause: Throwable): Boolean = false
}
- 协程调用 cancel 时会取消它的所有子协程,默认不会取消它的父协程
- 协程的取消只是在第一层包装 AbstractCoroutine 中修改协程的状态,不会影响到第二层包装 BaseContinuationImpl 中的执行逻辑,即协程的取消只是修改状态,不会取消协程的实际执行逻辑
协程异常处理
异常处理入口:BaseContinuationImpl.resumeWith:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class BaseContinuationImpl {
fun resumeWith(result: Result<Any?>) {
// ...
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception) // 子协程抛出异常时,在这里捕获并作为结果给 outcome
}
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
在捕获了异常后,调用 AbstractCoroutine.resumeWith
来处理,其流程为:AbstractCoroutine.resumeWith -> JobSupport.makeCompletingOnce -> JobSupport.tryMakeCompleting -> JobSupport.tryMakeCompletingSlowPath
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
val finishing = state as? Finishing ?: Finishing(list, false, null)
var notifyRootCause: Throwable? = null
synchronized(finishing) {
if (finishing.isCompleting) return COMPLETING_ALREADY
finishing.isCompleting = true
if (finishing !== state) {
if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
}
val wasCancelling = finishing.isCancelling
(proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
// If it just becomes cancelling --> must process cancelling notifications
notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
}
// process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
notifyRootCause?.let { notifyCancelling(list, it) }
val child = firstChild(state) // now wait for children
if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
// otherwise -- we have not children left (all were already cancelled?)
return finalizeFinishingState(finishing, proposedUpdate)
}
- 当协程发生异常时会取消它的所有子协程,默认会取消它的父协程
接下来看看 finalizeFinishingState
方法:
1
2
3
4
5
6
7
8
9
// JobSupport.kt
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
// ...
if (finalException != null) {
val handled = cancelParent(finalException) || handleJobException(finalException)
if (handled) (finalState as CompletedExceptionally).makeHandled()
}
// ...
}
- cancelParent 如果是 CancellationException 会返回 true,抛出 CancellationException 父协程可以不取消自己,忽略掉
- 如果协程抛出未捕获的非取消异常,则会一步步取消上层的协程,最后根协程调用 handleJobException 处理异常
1
2
3
// JobSupport.kt
// 处理未被parent coroutine处理的异常;返回true表示处理掉
protected open fun handleJobException(exception: Throwable): Boolean = false
实现类有 StandaloneCoroutine
和 ActorCoroutine
1
2
3
4
5
6
7
8
9
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}
调用 handleCoroutineException 来处理异常:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// CoroutineExceptionHandler.kt
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
// Invoke an exception handler from the context if present
try {
context[CoroutineExceptionHandler]?.let { // 定义了 CoroutineExceptionHandler 则由它处理
it.handleException(context, exception)
return
}
} catch (t: Throwable) {
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}
// If a handler is not present in the context or an exception was thrown, fallback to the global handler
handleCoroutineExceptionImpl(context, exception)
}
// CoroutineExceptionHandlerImpl.kt
// 根据 ServiceLoader, 在 Android 平台中还有 AndroidExceptionPreHandler 处理异常
private val handlers: List<CoroutineExceptionHandler> = ServiceLoader.load(
CoroutineExceptionHandler::class.java,
CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()
internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
// use additional extension handlers
for (handler in handlers) {
try {
handler.handleException(context, exception)
} catch (t: Throwable) {
// Use thread's handler if custom handler failed to handle exception
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
}
}
// use thread's handler
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}
- 不要抛出异常,这是最后一道能处理异常的地方
- 从 CoroutineContext 取出 CoroutineExceptionHandler 来处理异常
- 如果没有 CoroutineExceptionHandler 或者在 CoroutineExceptionHandler 又抛出一个异常 handleCoroutineExceptionImpl 处理
- 没有处理的异常交给 UncaughtExceptionHandler 来处理
- AndroidExceptionPreHandler 是对 CoroutineExceptionHandler 实现的 spi
结构化并发 (Structured Concurrency) 原理
疑问
协程何时需要线程切换?context[ContinuationInterceptor] 什么时候有值
在 CoroutineContext 定义了线程需要切换;
在 newCoroutineContext
,默认会添加 Dispatchers.Default
,这个时候 context[ContinuationInterceptor]
就会有值
协程如何切线程?
Continuation.intercepted(),ContinuationInterceptor 拦截 Continuation,而 CoroutineDispatcher 实现了 ContinuationInterceptor,所以协程的切换是以拦截器的方式实现的。
- 【带着问题学】协程到底是怎么切换线程的?
https://juejin.cn/post/6981056016897015838
协程如何处理异常?
入口:
在 BaseContinuationImpl.resumeWith,Result.failure(exception)
CancellationException 异常,会被忽略掉,不会取消父协程,只会取消其下所有子协程
private fun cancelParent(cause: Throwable): Boolean {
// Is scoped coroutine – don’t propagate, will be rethrown
if (isScopedCoroutine) return true
1
2
3
4
5
6
7
8
9
10
11
12
13
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}