文章

Kotlin协程原理

Kotlin协程原理

协程原理

suspend 原理

suspend 方法

示例代码:

1
2
3
4
suspend fun susFun() {
    delay(100)
    println("hello suspend function.")
}

反编译后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public static final Object susFun(Continuation<? super kotlin.Unit> r7) {
    /*
        r4 = -2147483648(0xffffffff80000000, float:-0.0)
        boolean r2 = r7 instanceof me.hacket.coroutine.SuspendFunTestKt.susFun.1 // 内部类SuspendFunTestKt$susFun$1
        if (r2 == 0) goto L_0x0026
        r2 = r7
        me.hacket.coroutine.SuspendFunTestKt$susFun$1 r2 = (me.hacket.coroutine.SuspendFunTestKt.susFun.1) r2
        int r3 = r2.label
        r3 = r3 & r4
        if (r3 == 0) goto L_0x0026
        int r3 = r2.label
        int r3 = r3 - r4
        r2.label = r3
    L_0x0013:
        java.lang.Object r1 = r2.result
        java.lang.Object r3 = kotlin.coroutines.intrinsics.IntrinsicsKt.getCOROUTINE_SUSPENDED()
        int r4 = r2.label
        switch(r4) {
            case 0: goto L_0x002d;
            case 1: goto L_0x003d;
            default: goto L_0x001e;
        }
    L_0x001e:
        java.lang.IllegalStateException r2 = new java.lang.IllegalStateException
        java.lang.String r3 = "call to 'resume' before 'invoke' with coroutine"
        r2.<init>(r3)
        throw r2
    L_0x0026:
        me.hacket.coroutine.SuspendFunTestKt$susFun$1 r0 = new me.hacket.coroutine.SuspendFunTestKt$susFun$1
        r0.<init>(r7)
        r2 = r0
        goto L_0x0013
    L_0x002d:
        kotlin.ResultKt.throwOnFailure(r1)
        r4 = 100
        r6 = 1
        r2.label = r6
        java.lang.Object r2 = kotlinx.coroutines.DelayKt.delay(r4, r2)
        if (r2 != r3) goto L_0x0040
        r2 = r3
    L_0x003c:
        return r2
    L_0x003d:
        kotlin.ResultKt.throwOnFailure(r1)
    L_0x0040:
        java.lang.String r2 = "hello suspend function."
        java.io.PrintStream r3 = java.lang.System.out
        r3.println(r2)
        kotlin.Unit r2 = kotlin.Unit.INSTANCE
        goto L_0x003c
        switch-data {0->0x002d, 1->0x003d, }
    */
}
  • 就是 CPS 代码,一堆 switch-case 处理不同的状态
  • susFun 方法内部类 SuspendFunTestKt$susFun$1 是一个 ContinuationImpl:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final class SuspendFunTestKt$susFun$1 extends ContinuationImpl {
    int label;
    /* synthetic */ Object result;

    SuspendFunTestKt$susFun$1(Continuation continuation) {
        super(continuation);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        this.result = $result;
        this.label |= Integer.MIN_VALUE;
        return SuspendFunTestKt.susFun(this);
    }
}

suspend lambda 是个什么东西?

示例代码:

1
2
3
4
5
6
7
8
9
fun main() {
    val mySuspend1: suspend () -> String = {
        delay(1000)
        "hehe"
    }
    val javaClass = mySuspend1.javaClass
    println("javaClass=${javaClass.simpleName}") // main$mySuspend1$1
    println("javaClass.superclass=${javaClass.superclass.simpleName}") // SuspendLambda
}

我们用 jadx 反编译看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public final class SuspendTestKt {
    public static final void main() {
        Class javaClass = new main.mySuspend1.1((Continuation) null).getClass();
        System.out.println((Object) ("javaClass=" + javaClass.getSimpleName()));
        StringBuilder append = new StringBuilder().append("javaClass.superclass=");
        Class<? super Object> superclass = javaClass.getSuperclass();
        Intrinsics.checkExpressionValueIsNotNull(superclass, "javaClass.superclass");
        System.out.println((Object) append.append(superclass.getSimpleName()).toString());
    }
}
// 内部类SuspendTestKt$main$mySuspend1$1
final class SuspendTestKt$main$mySuspend1$1 extends SuspendLambda implements Function1<Continuation<? super String>, Object> {
    int label;

    SuspendTestKt$main$mySuspend1$1(Continuation continuation) {
        super(1, continuation);
    }

    @NotNull
    public final Continuation<Unit> create(@NotNull Continuation<?> continuation) {
        Intrinsics.checkParameterIsNotNull(continuation, "completion");
        return new SuspendTestKt$main$mySuspend1$1(continuation);
    }

    public final Object invoke(Object obj) {
        return create((Continuation) obj).invokeSuspend(Unit.INSTANCE);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                this.label = 1;
                if (DelayKt.delay(1000, this) == coroutine_suspended) {
                    return coroutine_suspended;
                }
                break;
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        return "hehe";
    }
}

由反编译可知,可以看到 suspend () -> String 是一个 SuspendLambda 并实现了 Function1 接口。

协程原理

以下面代码为例,分析协程执行原理:

1
2
3
4
5
6
7
fun main() {
   val scope = MyContextScope(EmptyCoroutineContext)
    scope.launch(Dispatchers.IO) {
        println("hello world. ${Thread.currentThread().name}")
    }
    Thread.sleep(2000)
}

输出:

1
hello world. DefaultDispatcher-worker-1

反编译生成代码

反编译生成的代码,launch 的第三个参数 block 会生成一个 SuspendLambda 内部类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final class HahaKt {
    public static final void main() {
        BuildersKt.launch$default((CoroutineScope) new MyContextScope(EmptyCoroutineContext.INSTANCE), Dispatchers.getIO(), (CoroutineStart) null, new main.1((Continuation) null), 2, (Object) null);
        Thread.sleep(2000);
    }
}
// launch的第三个参数block会生成内部类HahaKt$main$1
final class HahaKt$main$1 extends SuspendLambda implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    int label;
    private CoroutineScope p$;

    HahaKt$main$1(Continuation continuation) {
        super(2, continuation);
    }

    @NotNull
    public final Continuation<Unit> create(@Nullable Object value, @NotNull Continuation<?> continuation) {
        Intrinsics.checkParameterIsNotNull(continuation, "completion");
        HahaKt$main$1 hahaKt$main$1 = new HahaKt$main$1(continuation);
        CoroutineScope coroutineScope = (CoroutineScope) value;
        hahaKt$main$1.p$ = (CoroutineScope) value;
        return hahaKt$main$1;
    }

    public final Object invoke(Object obj, Object obj2) {
        return create(obj, (Continuation) obj2).invokeSuspend(Unit.INSTANCE);
    }

    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
        IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                CoroutineScope coroutineScope = this.p$;
                StringBuilder append = new StringBuilder().append("hello world. ");
                Thread currentThread = Thread.currentThread();
                Intrinsics.checkExpressionValueIsNotNull(currentThread, "Thread.currentThread()");
                System.out.println((Object) append.append(currentThread.getName()).toString());
                return Unit.INSTANCE;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
    }
}

协程的创建

CoroutineScope.launch

首先看 launch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// CoroutineScope
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // 根据父级创建新的上下文(协程的父级上下文)
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        // 协程真正的上下文生成是以newContext作为父级上下文生成的
        LazyStandaloneCoroutine(newContext, block) else 
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
// 这是一个CoroutineScope的扩展函数,coroutineContext其实就是拿到到了scope对象的成员
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}
  • newCoroutineContext 创建一个新的协程上下文,作为生成 XXXCoroutine 的 parentContext
  • 根据 CoroutineStart 是否 isLazy,协程是否马上执行
  • coroutine.start 执行协程

AbstractCoroutine.start

现在看看 AbstractCoroutine.start:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// AbstractCoroutine.kt
public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    init {
        if (initParentJob) initParentJob(parentContext[Job])
    }
    
    // receiver: StandaloneCoroutine
    // block: suspend StandaloneCoroutine.() -> Unit
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        // initParentJob()
        start(block, receiver, this) // 等同于start.invoke() ,注意第3个参数completion:this
    }
}

调用了 CoroutineStart.invoke() 方法,看看它的参数:

  • 参数 1:block,要执行的协程体:suspend StandaloneCoroutine.() -> Unit,、其实就是一个 SuspendLambda
  • 参数 1:receiver,为一个 AbstractCoroutine,这里为 StandaloneCoroutine
  • 参数 3:completion,就是 this,即 StandaloneCoroutine:,这个参数很重要,后面要用到

看看 StandaloneCoroutine:

1
2
3
4
5
6
7
8
9
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

CoroutineStart.invoke

调用到了 CoroutineStart,CoroutineStart 是一个枚举类,接着看 CoroutineStart#invoke() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// CoroutineStart
public enum class CoroutineStart {
    DEFAULT, LAZY, ATOMIC, UNDISPATCHED;
    
    public val isLazy: Boolean get() = this === LAZY
    
    // block - suspend StandaloneCoroutine.() -> Unit,为SuspendLambda
    // receiver - StandaloneCoroutine
    // completion - StandaloneCoroutine<Unit>
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
    // 根据 start 参数的类型调用不同的方法
    when (this) {
        CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
        CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        CoroutineStart.LAZY -> Unit // will start lazily
    }
}

然后调用 startCoroutineCancellable()

(suspend (R) -> T).startCoroutineCancellable 创建 Continuation

这里我们看 CoroutineStart.DEFAULT,然后调用了 block.startCoroutineCancellable(receiver, completion)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Cancellable.kt
// receiver - StandaloneCoroutine
// completion - StandaloneCoroutine<Unit>
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) { // 抛出异常后,调用Continuation.resumeWith(Result.failure(e))
        createCoroutineUnintercepted(receiver, completion)
            .intercepted()
            .resumeCancellableWith(Result.success(Unit), onCancellation)
    }
private inline fun runSafely(completion: Continuation<*>, block: () -> Unit) {
    try {
        block()
    } catch (e: Throwable) {
        completion.resumeWith(Result.failure(e))
    }
}
(suspend (R) -> T).createCoroutineUnintercepted

(suspend (R) -> T).createCoroutineUnintercepted 创建一个 Continuation
现在看看 createCoroutineUnintercepted()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Cancellable.kt
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>):Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

// https://github.com/JetBrains/kotlin/blob/master/libraries/stdlib/jvm/src/kotlin/coroutines/jvm/internal/DebugProbes.kt
internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
    /** implementation of this function is replaced by debugger */
    return completion
}
  • 通过前面的分析可知道 suspend()->T 是一个 SuspendLambda,SuspendLambda 间接继承了 BaseContinuationImpl,上面会走 create() 方法,前面反编译可知 create 方法会创建一个 SuspendLambda,参数为 completion
    其继承关系为: SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation
  • createCoroutineFromSuspendFunction 用来 suspending lambda 没有继承 BaseContinuationImpl,具体源码看 IntrinsicsJvm

接着看 create(completion),create 方法创建的 Continuation 是一个 SuspendLambda 对象。
看看 create 反编译后的代码:

1
2
3
4
5
6
@NotNull
 public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
    Intrinsics.checkNotNullParameter(completion, "completion");
    Function2 var3 = new <anonymous constructor>(completion);
    return var3;
 }

接着看 intercepted()

Continuation.intercepted() 返回 DispatchedContinuation

接着回到 startCoroutineCancellable 看 intercepted(),通过 ContinuationInterceptor 拦截当前 Continuation

1
2
3
4
5
// Cancellable.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
// 如果是ContinuationImpl类型,则调用intercepted方法,否则返回自身
// 这里的 this 是 Main$main$1 实例 - ContinuationImpl的子类
    (this as? ContinuationImpl)?.intercepted() ?: this

接着看 ContinuationImpl.intercepted()

1
2
3
4
5
6
7
8
9
10
11
// ContinuationImpl
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    private var intercepted: Continuation<Any?>? = null
    public fun intercepted(): Continuation<Any?> =
        // context[ContinuationInterceptor]是 CoroutineDispatcher 实例
        intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
        .also { intercepted = it }
}

直接返回 intercepted;如果 intercepted 为 null,取 CoroutineContext 中的 ContinuationInterceptor,并调用其 interceptContinuation()

CoroutineDispatcher 实现了 ContinuationInterceptor

1
2
3
// CoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

所以 intercepted() 分情况:

  1. 需要线程调度 - 返回 DispatchedContinuation,其 continuation 参数值为 SuspendLambda
  2. 不需要线程调度 - 返回 SuspendLambda

协程的启动

接下来看看 resumeCancellableWith 是怎么启动协程的,这里还涉及到 Dispatchers 线程调度的逻辑:

DispatchedContinuation

前面 startCoroutineCancellable() 里,如果有线程调度,那么返回的是 DispatchedContinuation;没有的话返回 SuspendLambda

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// DispatchedContinuation
internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {

    override val delegate: Continuation<T>
        get() = this
        
    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
        } else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }

    inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        if (dispatcher.isDispatchNeeded(context)) { // 判断是否需要线程调度
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this) // 将协程的运算分发到另一个线程
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) { // 不需要调度则直接在当前线程执行协程
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
    inline fun resumeUndispatchedWith(result: Result<T>) {
        withContinuationContext(continuation, countOrElement) {
            continuation.resumeWith(result)
        }
    }   
}


public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    // 进行线程调度,最后也会执行到continuation.resumeWith方法
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) 
    // 直接执行continuation.resumeWith方法
    else -> resumeWith(result)
}
  1. 当需要线程调度时,则在调度后会调用 DispatchedContinuation.continuation.resumeWith 来启动协程,其中 continuation 是 SuspendLambda 实例
  2. 当不需要线程调度时,则直接调用 SuspendLambda.resumeWith 来启动协程

DispatchedContinuation 继承自 DispatchedTask,又继承自 Task,最终实现了 Runnable,那我们看下其 run 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
    public final override fun run() {
        assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                val context = continuation.context
                val state = takeState() // NOTE: Must take state in any case, even if cancelled
                val exception = getExceptionalResult(state)
                /*
                 * Check whether continuation was originally resumed with an exception.
                 * If so, it dominates cancellation, otherwise the original exception
                 * will be silently lost.
                 */
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            // This instead of runCatching to have nicer stacktrace and debug experience
            fatalException = e
        } finally {
            val result = runCatching { taskContext.afterTask() }
            handleFatalException(fatalException, result.exceptionOrNull())
        }
    }
}
  • continuation 就是 DispatchedContinuation 构造器中的 continuation,就是 SuspendLambda
  • 封装了 Continuation 的 resumeWithExceptionresume 操作逻辑,最终调用的是 SuspendLambda 的 resume 方法

下面看看 SuspendLambda 的类关系:
SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation

resumeWith 方法调用的是父类 BaseContinuationImpl 中的 resumeWith 方法:

1
2
3
4
5
6
7
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        // ...
        val outcome = invokeSuspend(param)
        // ...
    }
}

SuspendLambda→ContinuationImpl→BaseContinuationImpl→Continuation

SuspendLambda

由前面可知 suspend () -> T 是一个 SuspendLambda,现在看看 SuspendLambda

1
2
3
4
5
6
7
8
9
10
11
12
13
// Suspension lambdas inherit from this class
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)

    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation
}
ContinuationImpl

SuspendLambda 继承 ContinuationImpl,接着看看 ContinuationImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> = // 拦截Continuation
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation // just in case
    }
}
BaseContinuationImpl

ContinuationImpl 又继承 BaseContinuationImpl,接着看 BaseContinuationImpl:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>?) : 
    // 这个completion就是AbstractCoroutine
Continuation<Any?>, CoroutineStackFrame, Serializable {
    
    public final override fun resumeWith(result: Result<Any?>) {
        
        var current = this
        var param = result
        while (true) { // 死循环
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // SuspendLambda是BaseContinuationImpl
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else { // AbstractCoroutine不是BaseContinuationImpl
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

    protected open fun releaseIntercepted() {
        // does nothing here, overridden in ContinuationImpl
    }

    // 子类实现,返回一个Continuation
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }
    // 子类实现,返回一个Continuation
    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }

    public override fun toString(): String =
        "Continuation at ${getStackTraceElement() ?: this::class.java.name}"

    // --- CoroutineStackFrame implementation

    public override val callerFrame: CoroutineStackFrame?
        get() = completion as? CoroutineStackFrame

    public override fun getStackTraceElement(): StackTraceElement? =
        getStackTraceElementImpl()
}
  • create 方法需由子类实现,返回一个 Continuation(类似 SuspendTestKt$main$mySuspend1$1 继承自 SuspendLambda)
  • resumeWith 执行的入口
  • invokeSuspend 真正的代码逻辑,即你自己写的协程体的代码

最后协程是调用了 AbstractCoroutine 的 resumeWith

1
2
3
4
5
6
// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
    val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}

协程启动小结

  1. 协程的启动是通过 BaseContinuationImpl.resumeWith 方法调用到了子类 SuspendLambda.invokeSuspend 方法,然后通过状态机来控制顺序运行
  2. 在 BaseContinuationImpl.resumeWith 有个死循环,调用 invokeSuspend 来执行具体的协程代码,碰到 COROUTINE_SUSPENDED 时,
  3. Kotlin 中的协程存在着三层包装
1
2
3
第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;
第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;
第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。

hufmb

协程状态机

以下面的代码为例解析一下协程启动的状态机流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private suspend fun getId(): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "hearing"
    }.await()
}

private suspend fun getAvatar(id: String): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "avatar-$id"
    }.await()
}

fun main() {
    GlobalScope.launch {
        val id = getId()
        val avatar = getAvatar(id)
        println("${Thread.currentThread().name} - $id - $avatar")
    }
}

上面 main 方法中,GlobalScope.launch 启动的协程体在执行到 getId 后,协程体会挂起,直到 getId 返回可用结果,才会 resume launch 协程,执行到 getAvatar 也是同样的过程。

协程内部实现使用状态机来处理不同的挂起点,将 GlobalScope.launch 协程体字节码反编译成 Java 代码,大致如下 (有所删减):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
private static final Object getId(Continuation $completion) {
  return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
     int label;
     @Nullable
     public final Object invokeSuspend(@NotNull Object $result) {
        Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch(this.label) {
        case 0:
           ResultKt.throwOnFailure($result);
           this.label = 1;
           if (DelayKt.delay(1000L, this) == var2) {
              return var2;
           }
           break;
        case 1:
           ResultKt.throwOnFailure($result);
           break;
        default:
           throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        return "hearing";
     }

     @NotNull
     public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Intrinsics.checkNotNullParameter(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        return var3;
     }

     public final Object invoke(Object var1, Object var2) {
        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
     }
  }), 2, (Object)null).await($completion);
}

private static final Object getAvatar(final String id, Continuation $completion) {
  return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
     int label;
     @Nullable
     public final Object invokeSuspend(@NotNull Object $result) {
        Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch(this.label) {
        case 0:
           ResultKt.throwOnFailure($result);
           this.label = 1;
           if (DelayKt.delay(1000L, this) == var2) {
              return var2;
           }
           break;
        case 1:
           ResultKt.throwOnFailure($result);
           break;
        default:
           throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        }
        return "avatar-" + id;
     }

     @NotNull
     public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
        Intrinsics.checkNotNullParameter(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        return var3;
     }

     public final Object invoke(Object var1, Object var2) {
        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
     }
  }), 2, (Object)null).await($completion);
}

public static final void main() {
    BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null,
        (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
        int label;
    
        public final Object invokeSuspend(@NotNull Object $result) {
            Object var10000;
            String id;
            label17: {
                CoroutineScope $this$launch;
                switch(this.label) {
                case 0: // a
                    ResultKt.throwOnFailure($result);
                    $this$launch = this.p$;
                    this.label = 1; // label置为1
                    var10000 = getId(this);
                    if (var10000 == COROUTINE_SUSPENDED) {
                        return COROUTINE_SUSPENDED;
                    }
                    // 若此时已经有结果,则不挂起,直接break
                    break;
                case 1: // b
                    ResultKt.throwOnFailure($result);
                    var10000 = $result;
                    break;
                case 2: // d
                    id = (String)this.L$1;
                    ResultKt.throwOnFailure($result);
                    var10000 = $result;
                    break label17; // 退出label17
                default:
                    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                }
                // c
                id = (String)var10000;
                this.L$1 = id; // 将id赋给L$1
                this.label = 2; // label置为2
                var10000 = getAvatar(id, this);
                if (var10000 == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
            }
            // e
            String avatar = (String)var10000;
            String var5 = var9.append(var10001.getName()).append(" - ").append(id).append(" - ").append(avatar).toString();
            System.out.println(var5);
            return Unit.INSTANCE;
        }
    
        @NotNull
        public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkParameterIsNotNull(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            var3.p$ = (CoroutineScope)value;
            return var3;
        }
    
        public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
        }
    }
}

invokeSuspend 方法会在协程体中的 suspend 函数得到结果后被调用;具体调用在 BaseContinuationImpl.resumeWith 调用

执行流程:

  1. a: launch 协程体刚执行到 getId 方法时,getId 方法的返回值将是 COROUTINE_SUSPENDED, 此时直接 return, 则 launch 协程体中 getId 后面的代码暂时不会执行,即 launch 协程体被挂起 (非阻塞, 该线程依旧会做其它工作)。这里将 label 置为了 1. 而若此时 getId 已经有结果 (内部没有调用 delay 之类的 suspend 函数等),则不挂起,而是直接 break。
  2. b: 若上面 a 中 getId 返回 COROUTINE_SUSPENDED, 则当 getId 有可用结果返回后,会重新执行 launch 协程体的 invokeSuspend 方法,根据上面的 label==1, 会执行到这里检查一下 result 没问题的话就 break, 此时 id 赋值给了 var10000。
  3. c: 在 a 中若直接 break 或 在 b 中得到 getId 的结果然后 break 后,都会执行到这里,得到 id 的值并把 label 置为 2。然后调用 getAvatar 方法,跟 getId 类似,若其返回 COROUTINE_SUSPENDED 则 return,协程被挂起,等到下次 invokeSuspend 被执行,否则离开 label17 接着执行后续逻辑。
  4. d: 若上面 c 中 getAvatar 返回 COROUTINE_SUSPENDED, 则当 getAvatar 有可用结果返回后会重新调用 launch 协程体的 invokeSuspend 方法,此时根据 label==2 来到这里并取得之前的 id 值,检验 result(即 avatar),然后 break label17。
  5. e: c 中直接返回了可用结果 或 d 中 break label17 后,launch 协程体中的 suspend 函数都执行完毕了,这里会执行剩下的逻辑。

协程的挂起和恢复

Kotlin 编译器会为 协程体 生成继承自 SuspendLambda 的子类,协程的真正运算逻辑都在其 invokeSuspend 方法中。

Kotlin 协程的内部实现使用了 Kotlin 编译器的一些编译技术,当 suspend 函数被调用时,都有一个隐式的参数额外传入,这个参数是 Continuation 类型,封装了协程 resume 后执行的代码逻辑。

1
2
3
4
5
6
7
8
9
10
11
private suspend fun getId(): String {
    return GlobalScope.async(Dispatchers.IO) {
        delay(1000)
        "hearing"
    }.await()
}

// Decompile成Java
final Object getId(@NotNull Continuation $completion) {
    // ...
}

其中传入的 $completion 参数,可以看到是调用 getId 方法所在的协程体对象,也就是一个 SuspendLambda 对象。Continuation 的定义如下:

1
2
3
4
5
public interface Continuation<in T> {
    public val context: CoroutineContext

    public fun resumeWith(result: Result<T>)
}

将 getId 方法编译后的字节码反编译成 Java 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
final Object getId(@NotNull Continuation $completion) {
    // 新建与启动协程
    return BuildersKt.async$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getIO(), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
        int label;

        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
            switch(this.label) {
            case 0:
                ResultKt.throwOnFailure($result);
                this.label = 1;
                if (DelayKt.delay(1000L, this) == COROUTINE_SUSPENDED) {
                    return COROUTINE_SUSPENDED;
                }
                break;
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
            return "hearing";
        }

        // ...
    }), 2, (Object)null).await($completion); // 调用 await() suspend 函数
}
  1. 在 getId,delay 未返回值时,返回 COROUTINE_SUSPENDED,即代表还没有值,此时协程挂起,但不阻塞线程;
  2. 当 suspend 函数有返回值时,会继续调用 invokeSuspend,恢复协程运行

父子协程

launch:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

// AbstractCoroutine
init {
    if (initParentJob) initParentJob(parentContext[Job])
}
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

接着看 initParentJob():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// JobSupport
protected fun initParentJob(parent: Job?) { // parent就是父协程的CoroutineContext协程上下文
    assert { parentHandle == null }
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    parent.start() // make sure the parent is started
    val handle = parent.attachChild(this)
    parentHandle = handle
    // now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle // release it just in case, to aid GC
    }
}

接下来重点在于 parent.attachChild 方法:

1
2
3
public final override fun attachChild(child: ChildJob): ChildHandle {
    return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}

invokeOnCompletion 方法主要是将 handler 节点添加到父协程的一个队列 (state.list) 中

GlobalScope.launch 没有父协程

协程完成

协程的完成通过 AbstractCoroutine.resumeWith 实现

1
2
3
4
5
6
// AbstractCoroutine
public final override fun resumeWith(result: Result<T>) {
    val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}

调用路径:makeCompletingOnce -> tryMakeCompleting -> tryMakeCompletingSlowPath -> tryWaitForChild:

1
2
3
4
5
6
7
8
9
private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
    val handle = child.childJob.invokeOnCompletion(
        invokeImmediately = false,
        handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
    )
    if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
    val nextChild = child.nextChild() ?: return false
    return tryWaitForChild(state, nextChild, proposedUpdate)
}

可知 tryWaitForChild 方法将 ChildCompletion 节点添加到了子协程的 state.list 队列中,当子协程完成或者取消时调用 ChildCompletion.invoke:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ChildCompletion
override fun invoke(cause: Throwable?) {
    parent.continueCompleting(state, child, proposedUpdate)
}

private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
    assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
    // figure out if we need to wait for next child
    val waitChild = lastChild.nextChild()
    // try wait for next child
    if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
    // no more children to wait -- try update state
    val finalState = finalizeFinishingState(state, proposedUpdate)
    afterCompletion(finalState)
}

父协程需要等待所有子协程处于完成或者取消状态才能完成自身。

协程取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// JobSupport.kt
// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}
public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}
internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        // make sure it is completing, if cancelMakeCompleting returns state it means it had make it
        // completing and had recorded exception
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
            finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}

makeCancelling() 调用了 notifyCancelling()

1
2
3
4
5
6
7
8
9
10
// JobSupport.kt
// list是一个协程启动时,initParentJob()将自己添加到了父Job的list,封装成了ChildHandleNode添加到父Job的list
private fun notifyCancelling(list: NodeList, cause: Throwable) {
    // first cancel our own children
    onCancelling(cause)
    // 会循环执行上面添加的 ChildHandleNode 的 invoke 方法,即循环取消子协程
    notifyHandlers<JobCancellingNode>(list, cause)
    // then cancel parent // 可能取消父协程
    cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}

下面看看父 parent 如何取消 child,notifyHandlers<JobCancellingNode>(list, cause)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
internal abstract class JobCancellingNode : JobNode()
internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) // parent取消child
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) // job取消parent
}
// 子协程通过该方法取消自己
public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}
public open fun childCancelled(cause: Throwable): Boolean {
    if (cause is CancellationException) return true
    return cancelImpl(cause) && handlesException
}

下面看看 child 如何取消 parent,cancelParent():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private fun cancelParent(cause: Throwable): Boolean {
    // isScopedCoroutine 为 true 则不传播且不取消父协程直接返回,默认为false,子类可以重写
    // Is scoped coroutine -- don't propagate, will be rethrown
    if (isScopedCoroutine) return true

    /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
     * This allow parent to cancel its children (normally) without being cancelled itself, unless
     * child crashes and produce some other exception during its completion.
     */
    val isCancellation = cause is CancellationException
    val parent = parentHandle
    // No parent -- ignore CE, report other exceptions.
    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }

    // Notify parent but don't forget to check cancellation
    return parent.childCancelled(cause) || isCancellation
}
1
2
3
4
5
6
7
8
private class SupervisorCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
    // supervisorScope 启动的协程调用 cancel 和传递异常时,只能由父协程向子协程传播,
    // 不会取消父协程
    override fun childCancelled(cause: Throwable): Boolean = false
}
  1. 协程调用 cancel 时会取消它的所有子协程,默认不会取消它的父协程
  2. 协程的取消只是在第一层包装 AbstractCoroutine 中修改协程的状态,不会影响到第二层包装 BaseContinuationImpl 中的执行逻辑,即协程的取消只是修改状态,不会取消协程的实际执行逻辑

协程异常处理

异常处理入口:BaseContinuationImpl.resumeWith:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class BaseContinuationImpl {
    fun resumeWith(result: Result<Any?>) {
        // ...
        val outcome: Result<Any?> =
        try {
            val outcome = invokeSuspend(param)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(outcome)
        } catch (exception: Throwable) {
            Result.failure(exception) // 子协程抛出异常时,在这里捕获并作为结果给 outcome
        }
        if (completion is BaseContinuationImpl) {
            // unrolling recursion via loop
            current = completion
            param = outcome
        } else {
            // top-level completion reached -- invoke and return
            completion.resumeWith(outcome)
            return
        }
    }
}

在捕获了异常后,调用 AbstractCoroutine.resumeWith 来处理,其流程为:
AbstractCoroutine.resumeWith -> JobSupport.makeCompletingOnce -> JobSupport.tryMakeCompleting -> JobSupport.tryMakeCompletingSlowPath

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
    val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
    val finishing = state as? Finishing ?: Finishing(list, false, null)
    var notifyRootCause: Throwable? = null
    synchronized(finishing) {
        if (finishing.isCompleting) return COMPLETING_ALREADY
        finishing.isCompleting = true
        if (finishing !== state) {
            if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
        }
        val wasCancelling = finishing.isCancelling
        (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
        // If it just becomes cancelling --> must process cancelling notifications
        notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
    }
    // process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
    notifyRootCause?.let { notifyCancelling(list, it) }
    val child = firstChild(state) // now wait for children
    if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
    // otherwise -- we have not children left (all were already cancelled?)
    return finalizeFinishingState(finishing, proposedUpdate)
}
  1. 当协程发生异常时会取消它的所有子协程,默认会取消它的父协程

接下来看看 finalizeFinishingState 方法:

1
2
3
4
5
6
7
8
9
// JobSupport.kt
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
    // ...
    if (finalException != null) {
        val handled = cancelParent(finalException) || handleJobException(finalException)
        if (handled) (finalState as CompletedExceptionally).makeHandled()
    }
    // ...
}
  1. cancelParent 如果是 CancellationException 会返回 true,抛出 CancellationException 父协程可以不取消自己,忽略掉
  2. 如果协程抛出未捕获的非取消异常,则会一步步取消上层的协程,最后根协程调用 handleJobException 处理异常
1
2
3
// JobSupport.kt
// 处理未被parent coroutine处理的异常;返回true表示处理掉
protected open fun handleJobException(exception: Throwable): Boolean = false

实现类有 StandaloneCoroutineActorCoroutine

1
2
3
4
5
6
7
8
9
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

调用 handleCoroutineException 来处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// CoroutineExceptionHandler.kt
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
    // Invoke an exception handler from the context if present
    try {
        context[CoroutineExceptionHandler]?.let { // 定义了 CoroutineExceptionHandler 则由它处理
            it.handleException(context, exception)
            return
        }
    } catch (t: Throwable) {
        handleCoroutineExceptionImpl(context, handlerException(exception, t))
        return
    }
    // If a handler is not present in the context or an exception was thrown, fallback to the global handler
    handleCoroutineExceptionImpl(context, exception)
}

// CoroutineExceptionHandlerImpl.kt
// 根据 ServiceLoader, 在 Android 平台中还有 AndroidExceptionPreHandler 处理异常
private val handlers: List<CoroutineExceptionHandler> = ServiceLoader.load(
    CoroutineExceptionHandler::class.java,
    CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()
internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
    // use additional extension handlers
    for (handler in handlers) {
        try {
            handler.handleException(context, exception)
        } catch (t: Throwable) {
            // Use thread's handler if custom handler failed to handle exception
            val currentThread = Thread.currentThread()
            currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
        }
    }

    // use thread's handler
    val currentThread = Thread.currentThread()
    currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}
  1. 不要抛出异常,这是最后一道能处理异常的地方
  2. 从 CoroutineContext 取出 CoroutineExceptionHandler 来处理异常
  3. 如果没有 CoroutineExceptionHandler 或者在 CoroutineExceptionHandler 又抛出一个异常 handleCoroutineExceptionImpl 处理
  4. 没有处理的异常交给 UncaughtExceptionHandler 来处理
  5. AndroidExceptionPreHandler 是对 CoroutineExceptionHandler 实现的 spi

结构化并发 (Structured Concurrency) 原理

疑问

协程何时需要线程切换?context[ContinuationInterceptor] 什么时候有值

在 CoroutineContext 定义了线程需要切换;

newCoroutineContext,默认会添加 Dispatchers.Default,这个时候 context[ContinuationInterceptor] 就会有值

协程如何切线程?

Continuation.intercepted(),ContinuationInterceptor 拦截 Continuation,而 CoroutineDispatcher 实现了 ContinuationInterceptor,所以协程的切换是以拦截器的方式实现的。

协程如何处理异常?

入口:
在 BaseContinuationImpl.resumeWith,Result.failure(exception)

CancellationException 异常,会被忽略掉,不会取消父协程,只会取消其下所有子协程

private fun cancelParent(cause: Throwable): Boolean {
// Is scoped coroutine – don’t propagate, will be rethrown
if (isScopedCoroutine) return true

1
2
3
4
5
6
7
8
9
10
11
12
13
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
 * This allow parent to cancel its children (normally) without being cancelled itself, unless
 * child crashes and produce some other exception during its completion.
 */
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
    return isCancellation
}

// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation

}

本文由作者按照 CC BY 4.0 进行授权