文章

02Kotlin协程启动(协程上下文、启动模式、协程作用域)

02Kotlin协程启动(协程上下文、启动模式、协程作用域)

协程启动 (协程上下文、启动模式、协程作用域)

协程上下文 CoroutineContext

CoroutineContext 介绍

CoroutineContext,协程上下文,它是一个包含了用户定义的一些各种不同元素的 Element 对象集合。其中主要元素是:

  1. 协程的生命周期的句柄 Job
  2. 协程调度器 CoroutineDispatcher
  3. 协程的异常处理 CoroutineExceptionHandler
  4. 协程的名字 CoroutineName
  5. 拦截器 ContinuationInterceptor

这些数据都是和协程密切相关的,每一个 Element 都有一个唯一 key。

CoroutineContext 源码

下面是 CoroutineContext 源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    
    public operator fun plus(context: CoroutineContext): CoroutineContext = 
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
    
    public fun minusKey(key: Key<*>): CoroutineContext
    
    public interface Key<E : Element>
    
    public interface Element : CoroutineContext { 
        public val key: Key<*>  
        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null
            
        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)
            
        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

都是实现了 Element 接口,同时都有个 CoroutineContext.Key 类型的伴生对象 key。看看 CoroutineContext 接口的几个方法

1
2
3
4
5
6
7
8
9
10
11
12
// 可以通过 key 来获取一个Element
public operator fun <E : CoroutineContext.Element> get(key: Key<E>): E?
    
// 和集合中的fold是一样的,用来遍历当前协程上下文中的Element集合
public fun <R> fold(initial: R, operation: (R, CoroutineContext.Element) -> R): R
    
// CoroutineContext中是通过plus来返回一个由原始的Element集合和通过+号引入的Element产生新的Element集合
public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else context.fold(this) { ...}
        
// 取出除key以外的当前协程上下文其他Element,返回的就是不包含key的协程上下文。
public fun minusKey(key: Key<*>): CoroutineContext
  • EmptyCoroutineContext 表示一个空的协程上下文。

CoroutineContext.Key/Element/CombinedContext

CoroutineContext.Key

每个 Key 被定义为其相关元素接口或类的伴生对象。这样,Key 可以通过使用元素类型的名称直接被引用。例如,coroutineContext[Job] 将返回 coroutineContext 所持有的 Job 的实例,如果不包含任何实例,则返回 null。

获取 Job
1
context.get(Job)
获取运行在哪个线程
1
context.get(CoroutineDispatcher.Key)
Element
CombinedContext
  1. EmptyCoroutineContext 和 other 的协程上下文相加,结果为 other
  2. 不是 EmptyCoroutineContext 的 context1 协程上下文和 other 的协程上下文相加,结果为一个 CombinedContext,left 为 context1,element 为 other。即相加后旧的协程上下文放在 left
  3. 每次协程上下文相加后都是返回一个新的 CombinedContext
  4. 相同 key 的协程上下文相加,右边的协程上下文会覆盖掉旧的上下文
  5. 在 CombinedContext 中对 Element 进行查找,如果 CombinedContext 中的 element(也就是当前的节点)包含了对应的 Key,那么就返回,否则就从 left 中继续递归这个过程。在 CombinedContext 中,遍历的顺序是从右往左进行递归。
  6. 所有 Element 中,有一个比较特殊的类型 (ContinuationInterceptor),这个对象永远会放置在最后面,这样是为了方便遍历
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 混合上下文(洋葱)
internal class CombinedContext(
    // 左上下文
    private val left: CoroutineContext,
    // 右元素
    private val element: Element
) : CoroutineContext, Serializable {
    // 根据 key 在上下文中查找元素
    override fun <E : Element> get(key: Key<E>): E? {
        var cur = this
        while (true) {
            // 如果输入 key 和右元素的 key 相同,则返回右元素(剥去洋葱的一片)
            cur.element[key]?.let { return it }
            // 若右元素不匹配,则向左继续查找
            val next = cur.left
            // 如果左上下文是混合上下文,则开始向左递归(剥去一片后还是一个洋葱,继续剥)
            if (next is CombinedContext) {
                cur = next
            } 
            // 若左上下文不是混合上下文,则结束递归
            else {
                return next[key]
            }
        }
    }
}

协程上下文继承

  1. 新创建的协程,它的 CoroutineContext 会包含一个全新的 Job 实例,它会帮助我们控制协程的生命周期;而剩下的元素会从父 CoroutineContext 继承,该 parent 可能是另外一个协程或创建该协程的 CoroutineScope
  2. 传入协程构建器的参数优先级高于继承的上下文参数,因此会覆盖对应的参数值
1
2
3
4
5
6
7
8
9
10
11
12
@Test
fun `test CoroutineContext extend`() = runBlocking {
    val scope = CoroutineScope(Job() + Dispatchers.Unconfined + CoroutineName("test"))
    val job = scope.launch(CoroutineName("hacket")) {
        println("launch job=${coroutineContext[Job]}, thread=${Thread.currentThread().name}")
        val result = async {
            println("async job=${coroutineContext[Job]}, thread=${Thread.currentThread().name}")
            "OK"
        }.await()
    }
    job.join()
}

输出:

1
2
launch job="hacket#2":StandaloneCoroutine{Active}@4b3a7e48, thread=Test worker @hacket#2
async job="hacket#3":DeferredCoroutine{Active}@18b6a653, thread=Test worker @hacket#3

CoroutineDispatcher 协程调度器,线程切换

协程调度器它确定了相关的协程在哪个线程或哪些线程上执行。协程调度器可以将协程限制在一个特定的线程执行,或将它分派到一个线程池,亦或是让它不受限地运行。

1
2
3
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(ContinuationInterceptor,{ it as? CoroutineDispatcher })
}

官方框架中预置了 4 个调度器:

1
2
3
4
5
6
7
8
9
10
11
public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher
        get() = MainDispatcherLoader.dispatcher
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

1、Dispatchers.Default

Default 是一个协程调度器,其指定的线程为共有的线程池,线程数量至少为 2,最大与 CPU 数相同。(默认调度器,在使用 launch 和 async 等协程构造器创建协程时,如果不指定调度器则会使用此默认调度器,该调度器会让协程在 JVM 提供的共享线程池中执行)

  • 适用场景:
    CPU 密集型任务调度器,适合处理后台计算。通常处理一些单纯的计算任务,或者执行时间较短任务。比如:Json 的解析,数据计算等
1
2
3
4
5
6
7
GlobalScope.launch{
    Log.d("launch", "启动一个协程")
}
// 等同于
GlobalScope.launch(Dispatchers.Default){
    Log.d("launch", "启动一个协程")
}

2、Dispatchers.Main

主线程调度器,让协程在主线程即 UI 线程中执行。

为方便使用官方在 Android 协程框架库中,已经为我们定义好了几个供我们开发使用。如:MainScopelifecycleScopeviewModelScope。它们都是使用的 Dispatchers.Main

  • 适用场景:
    通常用于 UI 交互,刷新等

3、Dispatchers.IO

让协程在 IO 线程 (子线程) 中执行,该调度器会与 Dispatchers.Default 调度器共享同一个线程池;IO 调度器,

  • 适用场景:
    IO 密集型任务调度器,适合执行 IO 相关操作。比如:网络处理,数据库操作,文件操作等

4、Dispatchers.Unconfined

非受限调度器,又或者称为 “ 无所谓 “ 调度器,不要求协程执行在特定线程上。该调度器不指定协程在某个线程中执行。设置了该调度器的协程会在调用者线程中启动执行直到第一个挂起点,挂起后,它将在挂起函数执行的线程中恢复,恢复的线程完全取决于该挂起函数在哪个线程执行。

5、newSingleThreadContext

这是 Kotlin 另外提供的一个调度器,它会为协程启动一个新的线程。一个专用的线程是一种非常昂贵的资源。在真实的应用程序中两者都必须被释放,当不再需要的时候,使用 close 函数,或存储在一个顶级变量中使它在整个应用程序中被重用。

几个调度器对比

Job

1
2
3
4
5
public interface Job : CoroutineContext.Element {
    public companion object Key : CoroutineContext.Key<Job> {
        // ...
    }
}

Job 无返回值

Job 我们可以认为他就是一个协程作业是通过 CoroutineScope.launch 生成的,同时它运行一个指定的代码块,并在该代码块完成时完成。我们可以通过 isActiveisCompletedisCancelled 来获取到 Job 的当前状态。Job 的状态如下图所示,摘自官方文档:

通过下图可以大概了解下一个协程作业从创建到完成或者取消:

  1. 协程处于 activated 状态,协程运行出错或调用 job.cancel() 会将当前任务置为取消中状态 (Cancelling)(isActive=false, isCancelled=true)
  2. 当所有子协程都完成后,协程会进入已取消状态 (Cancelled),此时 isCompleted=true,isCancelled=true

Deferred 带返回值

Job 完成时是没有返回值的,如果需要返回值的话,应该使用 Deferred,它是 Job 的子类。

1
2
3
4
5
6
7
public interface Deferred<out T> : Job {
    // 返回结果值,或者如果延迟被取消,则抛出相应的异常
    public suspend fun await(): T  
    public val onAwait: SelectClause1<T>
    public fun getCompleted(): T
    public fun getCompletionExceptionOrNull(): Throwable?
}

通过 await() 方法获取执行流的返回值,当然如果出现异常或者被取消执行,则会抛出相对应的异常。

CoroutineExceptionHandler

1
2
3
public interface CoroutineExceptionHandler : CoroutineContext.Element {
    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
}

ContinuationInterceptor

1
2
3
public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
}

CoroutineName

1
2
3
4
5
public data class CoroutineName(
    val name: String
) : AbstractCoroutineContextElement(CoroutineName) {
    public companion object Key : CoroutineContext.Key<CoroutineName>
}

协程启动模式 CoroutineStart

在 Kotlin 协程当中,启动模式是一个枚举:

1
2
3
4
5
6
7
8
public enum class CoroutineStart {
    DEFAULT,
    LAZY,
    @ExperimentalCoroutinesApi
    ATOMIC,
    @ExperimentalCoroutinesApi
    UNDISPATCHED;
}

四个启动模式当中我们最常用的其实是 DEFAULT 和 LAZY。

DEFAULT 立即调度

默认启动模式,我们可以称之为饿汉启动模式,因为协程创建后立即开始调度,虽然是立即调度,但不是立即执行,有可能在执行前被取消。

1
2
3
4
5
6
7
8
9
10
11
12
// 能取消吗?不能
val job = launch(start = CoroutineStart.DEFAULT) {
    var i = 0
    while (true) {
        println("i:$i")
        i++
    }
    println("finished.")
}
delay(1000)
println("delay 1000ms cancel.")
job.cancel()

LAZY 需要时才调度

懒汉启动模式,启动后并不会有任何调度行为,直到我们需要它执行的时候才会产生调度。也就是说只有我们主动的调用 Job 的 startjoin 或者 await 等函数时才会开始调度。
什么叫我们需要它执行的时候呢?就是需要它的运行结果的时候, launch 调用后会返回一个 Job 实例

  1. 调用 Job.start,主动触发协程的调度执行
  2. 调用 Job.join,隐式的触发协程的调度执行
  3. async 可以用 await
1
2
3
4
5
6
7
8
// 惰性启动
val job = async(start = CoroutineStart.LAZY) {
    println("lazy start")
    29
}
// 执行一些计算
// 再启动
job.start()

ATOMIC 立即调度,遇到第一个挂起点前不响应 cancel

一样也是在协程创建后立即开始调度,但是它和 DEFAULT 模式有一点不一样,通过 ATOMIC 模式启动的协程执行到第一个挂起点之前是不响应 cancel 取消操作的,ATOMIC 一定要涉及到协程挂起后 cancel 取消操作的时候才有意义。

UNDISPATCHED 立即执行,非调度(线程同调用者线程一致,直到新的挂起函数的线程切换才跟着切换线程)

协程在这种模式下会直接开始在当前线程下执行,直到运行到第一个挂起点。这听起来有点像 ATOMIC,不同之处在于 UNDISPATCHED 是不经过任何调度器就开始执行的。当然遇到挂起点之后的执行,将取决于挂起点本身的逻辑和协程上下文中的调度器。

  1. 无论我们是否指定协程调度器,挂起前的执行都是在当前线程下执行。
  2. 如果所在的协程没有指定调度器,那么就会在 join 处恢复执行的线程里执行
  3. 当我们指定了协程调度器时,遇到挂起点之后的执行将取决于挂起点本身的逻辑和协程上下文中的调度器。即 join 处恢复执行时,因为所在的协程有调度器,所以后面的执行将会在调度器对应的线程上执行。

案例:

1
2
3
4
5
6
7
8
9
10
11
12
private fun testUnDispatched(){
    GlobalScope.launch(Dispatchers.Main){
       val job = launch(Dispatchers.IO) {
           Log.d("${Thread.currentThread().name}线程", "-> 挂起前")
           delay(100)
           Log.d("${Thread.currentThread().name}线程", "-> 挂起后")
       }
       Log.d("${Thread.currentThread().name}线程", "-> join前")
       job.join()
       Log.d("${Thread.currentThread().name}线程", "-> join后")
   }
}

结果:

1
2
3
4
D/main线程: -> join前
D/DefaultDispatcher-worker-1线程: -> 挂起前
D/DefaultDispatcher-worker-1线程: -> 挂起后
D/main线程: -> join后

在子协程 launch 的时候使用 UNDISPATCHED 模式启动:

1
2
3
4
5
6
7
8
9
10
11
12
private fun testUnDispatched(){
    GlobalScope.launch(Dispatchers.Main){
        val job = launch(Dispatchers.IO,start = CoroutineStart.UNDISPATCHED) {
            Log.d("${Thread.currentThread().name}线程", "-> 挂起前")
            delay(100)
            Log.d("${Thread.currentThread().name}线程", "-> 挂起后")
        }
        Log.d("${Thread.currentThread().name}线程", "-> join前")
        job.join()
        Log.d("${Thread.currentThread().name}线程", "-> join后")
    }
}

结果:

1
2
3
4
D/main线程: -> 挂起前
D/main线程: -> join前
D/DefaultDispatcher-worker-1线程: -> 挂起后
D/main线程: -> join后

我们看到当以 UNDISPATCHED 模式即使我们指定了协程调度器 Dispatchers.IO,挂起前还是在 main 线程里执行,但是挂起后是在 worker-1 线程里面执行,这是因为当以 UNDISPATCHED 启动时,协程在这种模式下会直接开始在当前线程下执行,直到第一个挂起点。遇到挂起点之后的执行,将取决于挂起点本身的逻辑和协程上下文中的调度器,即 join 处恢复执行时,因为所在的协程有调度器,所以后面的执行将会在调度器对应的线程上执行。

再改一下,把子协程在 launch 的时候使用 UNDISPATCHED 模式启动,去掉 Dispatchers.IO 调度器,那又会出现什么情况呢

1
2
3
4
5
6
7
8
9
10
11
12
private fun testUnDispatched(){
    GlobalScope.launch(Dispatchers.Main){
        val job = launch(start = CoroutineStart.UNDISPATCHED) {
            Log.d("${Thread.currentThread().name}线程", "-> 挂起前")
            delay(100)
            Log.d("${Thread.currentThread().name}线程", "-> 挂起后")
        }
        Log.d("${Thread.currentThread().name}线程", "-> join前")
        job.join()
        Log.d("${Thread.currentThread().name}线程", "-> join后")
    }
}

结果:

1
2
3
4
D/main线程: -> 挂起前
D/main线程: -> join前
D/main线程: -> 挂起后
D/main线程: -> join后

我们发现它们都在一个线程里面执行了。这是因为当通过 UNDISPATCHED 启动后遇到挂起,join 处恢复执行时,如果所在的协程没有指定调度器,那么就会在 join 处恢复执行的线程里执行,即挂起后是在父协程 (Dispatchers.Main 线程里面执行,而最后 join 后这条日志的输出调度取决于这个最外层的协程的调度规则。

协程作用域 Scope CoroutineScope

协程作用域 (Coroutine Scope) 是协程运行的作用范围。作用:创建和追踪协程,管理不同协程之间的父子关系和结构。

CoroutineScope 定义了新启动的协程作用范围,launchasync 都是 CoroutineScope 的扩展函数,他们会继承了他的 coroutineContext 自动传播其所有的 elements 和取消操作。换句话说,如果这个作用域销毁了,那么里面的协程也随之失效

CoroutineScope 包含了 CoroutineContext。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface CoroutineScope {
    // 作用域下的协程上下文
    public val coroutineContext: CoroutineContext
}

// 将已有的协程作用域叠加一个协程上下文生成一个新的协程作用域
public operator fun CoroutineScope.plus(context: CoroutineContext): CoroutineScope =
    ContextScope(coroutineContext + context)

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

public object GlobalScope : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())

在 Kotlin 中,协程必须在 CoroutineScope 内执行。CoroutineScope 保持了对所有协程的跟踪,尽管被挂起也是这样,Scope 实际不执行协程,只是确保你不会跟丢或忘记他们。为了保证所有协程都被跟踪,Kotlin 不允许在没有 CoroutineScope 的情况下启动协程。

  1. Scope 能够具备挂起与恢复能力的协程
  2. 跟踪了所有协程,可以取消它所启动的协程

创建协程的方式:

  1. 通过 CoroutineScope 创建
  2. 在协程中创建

Coroutine builders 协程构建器,构建协程

runBlocking 用在 main 函数或 test 用例中

runBlocking 返回协程体最后一行

runBlocking {} 是创建一个新的协程同时阻塞当前线程,直到协程结束;返回值为协程体最后一行

  • runBlocking 源码
1
2
3
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
    // ...
}
  • 测试案例:
1
2
3
4
5
6
7
8
fun main(args: Array<String>) = runBlocking { // start main coroutine
    launch { // launch new coroutine in background and continue
        delay(1000L)
        println("World!")
    }
    println("Hello,") // main coroutine continues here immediately
    delay(2000L)      // delaying for 2 seconds to keep JVM alive
}

runBlocking 通常适用于单元测试的场景,而业务开发中不会用到这个函数,因为它是线程阻塞的。

为挂起函数编写单元测试的一种方式:

1
2
3
4
5
6
class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking<Unit> {
        // 这里我们可以使用任何喜欢的断言风格来使用挂起函数
    }
}

launch 同步调用

启动一个协程但不会阻塞调用线程,必须要在协程作用域 (CoroutineScope) 中才能调用,返回值是一个 Job。launch 方法是挂靠在接口 CoroutineScope 上的方法。像下面的 GlobalScope,就是接口 CoroutineScope 的一个实现类。

  • launch 源码:
1
2
3
4
5
6
7
8
9
10
11
12
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

launch() 方法参数:

  1. context
    context: CoroutineContext。 CoroutineContext 上下文是一系列元素的集合,最主要的两个元素是:Job、Dispatcher。 Job 控制协程的开始、取消等,Dispatcher 负责协程在哪个线程中执行。
  2. start
    start: CoroutineStart。 该入参主要是控制协程是直接执行还是 Lazy start。若是 CoroutineStart.LAZY,需要通过 job.start 方法主动开启协程。
  3. block
    block: suspend CoroutineScope.() -> Unit。 协程要执行的代码段。从定义看,block 是一个 suspend 匿名方法,且是挂靠在接口 CoroutineScope 下。 所有,代码段中的 this 关键字,直接代表了 CoroutineScope。
  4. Job 返回值
    Job 是 launch 方法的返回值,它就是用来控制协程的运行状态的。Job 中有几个关键方法:
  5. start。如果是 CoroutineStart.LAZY 创建出来的协程,调用该方法开启协程。
  6. cancel。取消正在执行的协程。如协程处于运算状态,则不能被取消。也就是说,只有协程处于阻塞状态时才能够取消。
  7. join。阻塞父协程,直到本协程执行完。
  8. cancelAndJoin。等价于 cancel + join。

案例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了一些有用的事
    return 13
}
suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了一些有用的事
    return 29
}
val time = measureTimeMillis {
    val one = doSomethingUsefulOne()
    val two = doSomethingUsefulTwo()
    println("The answer is ${one + two}")
}
println("Completed in $time ms")

结果:

1
2
The answer is 42
Completed in 2006 ms

doSomethingUsefulOne 和 doSomethingUsefulTwo 顺序执行,所以总耗时为 2000ms+

注意
launchasync 很大的区别是如何处理异常。async 假设你一定会调用 await 来获取返回值(及异常),所以默认不会抛出,意味着如果你使用 async 启动协程,它会吞掉异常

  • CoroutineScope.launch{}

async 异步

async 启动一个协程但不会阻塞调用线程,必须要在协程作用域 (CoroutineScope) 中才能调用。以 Deferred 对象的形式返回协程任务。返回值泛型 T 同 runBlocking 类似都是协程体最后一行的类型。

Deferred 继承自 Job,最主要的是增加了 await 方法,通过 await 方法返回 T。Deferred.await 在等待返回值时会阻塞当前的协程。

  • async 源码:
1
2
3
4
5
6
7
8
9
10
11
12
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

获取 CoroutineScope.async {}的返回值需要通过 await() 函数,它也是是个挂起函数,调用时会挂起当前协程直到 async 中代码执行完并返回某个值。

  • 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doSomethingUsefulOne() }
        val two = async { doSomethingUsefulTwo() }
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")    
}

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了些有用的事
    return 29
}

结果:

1
2
The answer is 42
Completed in 1013 ms

这里比 launch 快了两倍,因为两个协程并发执行。 请注意,使用协程进行并发总是显式的。

  • 惰性启动的 async async 可以通过将 start 参数设置为 CoroutineStart.LAZY 而变为惰性的。在这个模式下,只有结果通过 await 获取的时候协程才会启动,或者在 Job 的 start 函数调用的时候。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
        val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
        // 执行一些计算
        one.start() // 启动第一个
        two.start() // 启动第二个
        println("The answer is ${one.await() + two.await()}")
    }
    println("Completed in $time ms")    
}
suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假设我们在这里做了些有用的事
    return 13
}
suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假设我们在这里也做了些有用的事
    return 29
}

输出:

1
2
The answer is 42
Completed in 1014 ms
  • async 的异常在 await 抛出,CoroutineExceptionHandler 设置无效
  • async 通过 await 获取执行结果,不调用 await 也是会执行协程体的代码的
  • async 注意 await 的使用,要在获取值的时候再调用 await,不要在启动协程的时候就调用 await,使用不当就变成同步的了
    async+await 正确使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private suspend fun doOne(): Int {
    delay(1000)
    return 14
}
private suspend fun doTwo(): Int {
    delay(1000)
    return 25
}
// 错误示例:
@Test
fun `test combine async`() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doOne() }.await()
        val two = async { doTwo() }.await()
        println("The result:${one + two}")
    }
    println("Completed in $time ms")
}
// 输出
The result:39
Completed in 2039 ms

// 正确示例:
@Test
fun `test combine async`() = runBlocking<Unit> {
    val time = measureTimeMillis {
        val one = async { doOne() }
        val two = async { doTwo() }
        println("The result:${one.await() + two.await()}")
    }
    println("Completed in $time ms")
}
// 输出
The result:39
Completed in 1032 ms

不同方式产生的协程,彼此的关系

协程作用域分级

  1. 父协程需要等待所有的子协程执行完毕之后才会进入 Completed 状态,不管父协程自身的协程体是否已经执行完成。
  2. 子协程会继承父协程的协程上下文中的 Element,如果自身有相同 key 的成员,则覆盖对应的 key,覆盖的效果仅限自身范围内有效

顶级作用域

没有父协程的协程所在的作用域称之为顶级作用域。

协同作用域

在协程中启动一个协程,新协程为所在协程的子协程。子协程所在的作用域默认为协同作用域。此时子协程抛出未捕获的异常时,会将异常传递给父协程处理,如果父协程被取消,则所有子协程同时也会被取消。

主从作用域/监督作用域

官方称之为监督作用域。与协同作用域一致,区别在于该作用域下的协程取消操作的单向传播性,子协程的异常不会导致其它子协程取消。但是如果父协程被取消,则所有子协程同时也会被取消。

主从 (监督) 作用域与协同作用域一致,区别在于该作用域下的协程取消操作的单向传播性,子协程的异常不会导致其它子协程取消。分析主从 (监督) 作用域的时候,我们需要用到 supervisorScope 或者 SupervisorJob

案例

案例 1 ,协同作用域
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private fun testCoroutineScope2() {
    val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
        println("exceptionHandler ${coroutineContext[CoroutineName]} $throwable")
    }
    GlobalScope.launch(
            Dispatchers.IO
                    + CoroutineName("scope1")
                    + exceptionHandler) {
        println("scope --------- 1")
        launch(CoroutineName("scope2") + exceptionHandler) {
            println("scope --------- 2")
            throw  NullPointerException("空指针")
            println("scope --------- 3")
        }
        val scope3 = launch(CoroutineName("scope3") + exceptionHandler) {
            println("scope --------- 4")
            delay(2000)
            println("scope --------- 5")
        }
        scope3.join()
        println("scope --------- 6")
    }
}

如果 scope2 没有抛异常 throw NullPointerException("空指针"),结果

1
2
3
4
5
6
scope --------- 1
scope --------- 2
scope --------- 3
scope --------- 4
scope --------- 5
scope --------- 6

加了抛异常

1
2
3
scope --------- 1
scope --------- 2
exceptionHandler CoroutineName(scope1) java.lang.NullPointerException: 空指针

可以看到子协程 scope2 抛出了一个异常,将异常传递给父协程 scope1 处理,但是因为任何一个子协程异常退出会导致整体都将退出。所以导致父协程 scope1 未执行完成成就被取消,同时还未执行完子协程 scope3 也被取消了。

案例 2 ,主从 (监督) 作用域 supervisorScope
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private fun testCoroutineScope3() {
    val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
        println("exceptionHandler ${coroutineContext[CoroutineName]} $throwable")
    }
    GlobalScope.launch(Dispatchers.IO + CoroutineName("scope1") + exceptionHandler) {
        supervisorScope {
            println("scope --------- 1")
            launch(CoroutineName("scope2")) {
                println("scope --------- 2")
                throw  NullPointerException("空指针")
                println("scope --------- 3")
                val scope3 = launch(CoroutineName("scope3")) {
                    println("scope --------- 4")
                    delay(2000)
                    println("scope --------- 5")
                }
                scope3.join()
            }
            val scope4 = launch(CoroutineName("scope4")) {
                println("scope --------- 6")
                delay(2000)
                println("scope --------- 7")
            }
            scope4.join()
            println("scope --------- 8")
        }
    }
}

结果:

1
2
3
4
5
6
scope --------- 1
scope --------- 2
exceptionHandler CoroutineName(scope2) java.lang.NullPointerException: 空指针
scope --------- 6
scope --------- 7
scope --------- 8

可以看到子协程 scope2 抛出了一个异常,并将异常传递给父协程 scope1 处理,同时也结束了自己本身。因为在于主从 (监督) 作用域下的协程取消操作是单向传播性,因此协程 scope2 的异常并没有导致父协程退出,所以 6 7 8 都照常输出,而 3 4 5 因为在协程 scope2 里面所以没有输出。

案例 3 ,主从 (监督) 作用域 SupervisorJob
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private fun testCoroutineScope4() {
    val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
        println("exceptionHandler ${coroutineContext[CoroutineName]} $throwable")
    }
    val coroutineScope = CoroutineScope(SupervisorJob() + CoroutineName("coroutineScope"))
    GlobalScope.launch(Dispatchers.IO + CoroutineName("scope1") + exceptionHandler) {
        with(coroutineScope) {
            val scope2 = launch(CoroutineName("scope2") + exceptionHandler) {
                println("scope 1--------- ${coroutineContext[CoroutineName]}")
                throw  NullPointerException("空指针")
            }
            val scope3 = launch(CoroutineName("scope3") + exceptionHandler) {
                scope2.join()
                println("scope 2--------- ${coroutineContext[CoroutineName]}")
                delay(2000)
                println("scope 3--------- ${coroutineContext[CoroutineName]}")
            }
            scope2.join()
            println("scope 4--------- ${coroutineContext[CoroutineName]}")
            coroutineScope.cancel()
            scope3.join()
            println("scope 5--------- ${coroutineContext[CoroutineName]}")
        }
        println("scope 6--------- ${coroutineContext[CoroutineName]}")
    }
}

结果:

1
2
3
4
5
6
scope 1--------- CoroutineName(scope2)
exceptionHandler CoroutineName(scope2) java.lang.NullPointerException: 空指针
scope 2--------- CoroutineName(scope3)
scope 4--------- CoroutineName(coroutineScope)
scope 5--------- CoroutineName(coroutineScope)
scope 6--------- CoroutineName(scope1)

通过创建了一个 SupervisorJob 的主从 (监督) 协程作用域,调用了子协程的 join 是为了保证它一定是会执行。同样的子协程 scope2 抛出了一个异常,通过协程 scope2 自己内部消化了,同时也结束了自己本身;因为协程 scope2 的异常并没有导致 coroutineScope 作用域下的协程取消退出,所以协程 scope3 照常运行输出 2,后又因为调用了我们定义的协程作用域 coroutineScope 的 cancel 方法取消了协程,所以即使我们后面调用了协程 scope3 的 join,也没有输出 3,因为 SupervisorJob 的取消是向下传播的,所以后面的 4、5 都是在 coroutineScope 的作用域中输出的。

常见的 CoroutineScope

coroutineScope 一个协程失败了,它的所有兄弟协程也会被取消

使用 coroutineScope 创建一个新的 CoroutineScope, 它的 coroutineContext 是由外部协程的 coroutineContext 提供的。
它的作用是并行分解工作的。将一部分关联的工作放入该 coroutineScope 中,若其中一个子协程报错了,其他的子协程都会被 cancel 掉。
同时,它执行时会阻塞协程,并等待返回值 R。

  • 案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Test
fun `test coroutine scope builder`() = runBlocking<Unit> {
    // 一个协程失败了,所有其他兄弟协程也会被取消
    coroutineScope {
        val job1 = launch {
            delay(400)
            println("job1 finished.")
        }
        val job2 = launch {
            delay(200)
            println("job2 finished.")
            throw IllegalArgumentException()
        }
    }
}

输出:

1
2
3
job2 finished.
java.lang.IllegalArgumentException
	at me.hacket.test.协程.2协程启动.CoroutineTest02$test coroutine scope builder$1$1$job2$1.invokeSuspend(CoroutineTest02.kt:23)

可以看到在 coroutineScope 中,一个子协程抛出异常了,coroutineScope 中的子协程都会被取消

supervisorScope 一个协程失败了,不会影响其他兄弟协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// supervisorScope
// 一个协程失败了,不会影响其他兄弟协程
@Test
fun `test supervisor scope builder`() = runBlocking {
    supervisorScope {
        val job1 = launch {
            delay(400)
            println("job1 finished.")
        }
        val job2 = launch {
            delay(200)
            println("job2 finished.")
            throw IllegalArgumentException()
        }
    }
}

输出:

1
2
3
job2 finished.
Exception in thread "Test worker @coroutine#3" java.lang.RuntimeException: Exception while trying to handle coroutine exception
job1 finished.

可以看到在 supervisorScope 中,一个子协程抛出异常了,supervisorScope 中的其他子协程不会被取消

GlobalScope

生命周期是 process 级别的,即使 Activity 或 Fragment 被销毁,协程仍然在执行

MainScope

viewModelScope

在 ViewModel 中使用,绑定了 ViewModel 生命周期,在 ViewModel 销毁时也会被销毁

lifecycleScope

在有 LifecycleOwner(Activity/Fragment 中)使用,绑定 Lifecycle 生命周期

协程并发执行

async

runBlocking

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private fun test_async() {
    runBlocking {

        val s = System.currentTimeMillis()
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        delay(500)

        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        async {
            println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
            delay(1500L)
            println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
        }
        async {
            withContext(kotlinx.coroutines.Dispatchers.IO) {
                println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                delay(5000L)
                println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
            }
        }

        val cost = System.currentTimeMillis() - s
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
    }
}

执行结果:

1
2
3
4
5
6
7
Task from runBlocking 2024年12月09日 13:48:43 main
delay 500ms  2024年12月09日 13:48:44 main
launch end, cost=523, 2024年12月09日 13:48:44 main
Task1 start 2024年12月09日 13:48:44 main
Task2 start 2024年12月09日 13:48:44 DefaultDispatcher-worker-1
Task1 end 2024年12月09日 13:48:45 main
Task2 end 2024年12月09日 13:48:49 DefaultDispatcher-worker-1

如果 Task1 出现了异常,该如果执行:

1
2
3
4
5
6
7
async {
	println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
	delay(1500L)
	var s: String? = null
	s!!.length
	println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Task from runBlocking 2024年12月09日 13:51:10 main
delay 500ms  2024年12月09日 13:51:11 main
launch end, cost=519, 2024年12月09日 13:51:11 main
Task1 start 2024年12月09日 13:51:11 main
Task2 start 2024年12月09日 13:51:11 DefaultDispatcher-worker-1
Exception in thread "main" java.lang.NullPointerException
	at me.hacket.kt.coroutine.concurrent.CoroutineConcurrentTestKt$test_async$1$1.invokeSuspend(CoroutineConcurrentTest.kt:176)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
	at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:494)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:279)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at me.hacket.kt.coroutine.concurrent.CoroutineConcurrentTestKt.test_async(CoroutineConcurrentTest.kt:163)
	at me.hacket.kt.coroutine.concurrent.CoroutineConcurrentTestKt.main(CoroutineConcurrentTest.kt:31)
	at me.hacket.kt.coroutine.concurrent.CoroutineConcurrentTestKt.main(CoroutineConcurrentTest.kt)

小结:

  • Task1 和 Task2 并行执行
  • 如果有个协程抛出异常,其他的协程跟着取消

coroutineScope

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private fun test_async() {
    runBlocking {

        val s = System.currentTimeMillis()
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        delay(500)

        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        coroutineScope {

            async {
                println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                delay(1500L)
                var s: String? = null
                s!!.length
                println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
            }
            async {
                withContext(kotlinx.coroutines.Dispatchers.IO) {
                    println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                    delay(5000L)
                    println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                }
            }
        }

        val cost = System.currentTimeMillis() - s
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
    }
}

小结:runBlocking 类似

supervisorScope

supervisorScope
多次 async ,不 await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private fun test_async() {
    runBlocking {
        val s = System.currentTimeMillis()
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
        delay(500)

        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        supervisorScope {
            async {
                println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                delay(1500L)
                var s: String? = null
                s!!.length
                println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
            }
            async {
                withContext(kotlinx.coroutines.Dispatchers.IO) {
                    println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                    delay(5000L)
                    println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                }
            }
        }

        val cost = System.currentTimeMillis() - s
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
    }
}
![600](https://raw.githubusercontent.com/hacket/ObsidianOSS/master/obsidian/20241209135726.png)

小结:

  • 总耗时 5000 多 ms,说明 Task 1 和 Task 2 并行执行
  • 如果有个协程抛出异常,其他的协程不会取消
  • 在 supervisorScope 中如果没有用 async 启动,那么 Task1 和 Task2 代码就是顺序执行的,如果 Task1 抛出异常,后续代码就不会执行了
多次 async,await
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private fun test_async() {
    runBlocking {
        val s = System.currentTimeMillis()
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
        delay(500)
        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        supervisorScope {
            val task1 =
                async {
                    println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                    delay(1500L)
                    var s: String? = null
                    s!!.length
                    println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                    "Task1 result"
                }

            val task2 =
                async {
                    withContext(kotlinx.coroutines.Dispatchers.IO) {
                        println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                        delay(5000L)
                        println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                        "Task2 result"
                    }
                }

//            // Task2能输出,Task1抛出异常
            println("Task2 result: ${task2.await()}") // Task2 result: Task2 result
            println("Task1 result: ${task1.await()}") // Exception in thread "main" java.lang.NullPointerException
        }

        val cost = System.currentTimeMillis() - s
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
    }
}

如果是这样,task1.await() 在前面,Task2 就输出不了了

1
2
 println("Task1 result: ${task1.await()};Task2 result: ${task2.await()}")  
// println("Task1 result: ${task1.await() + task2.await()}")

多次 launch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private fun test_async() {  
    runBlocking {  
        val s = System.currentTimeMillis()  
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")  
        delay(500)  
        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")  
  
        supervisorScope {  
            val task1 =  
                launch {  
                    println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")  
                    delay(1500L)  
                    var s: String? = null  
                    s!!.length  
                    println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")  
                    "Task1 result"  
                }  
  
            val task2 =  
                launch {  
                    withContext(kotlinx.coroutines.Dispatchers.IO) {  
                        println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")  
                        delay(5000L)  
                        println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")  
                        "Task2 result"  
                    }  
                }  
        }  
  
        val cost = System.currentTimeMillis() - s  
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")  
  
    }  
}

小结:

  • 总耗时 5000 多 ms,说明 Task 1 和 Task 2 并行执行
  • 如果有个协程抛出异常,其他的协程不会取消
多个 supervisorScope
async 不等到结果获取,遇到异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private fun test_async() {
    runBlocking {
        val s = System.currentTimeMillis()
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
        delay(500)
        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        supervisorScope {
            async {
                println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                delay(1500L)
                var s: String? = null
                s!!.length
                println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
            }
        }
        supervisorScope {
            async {
                withContext(kotlinx.coroutines.Dispatchers.IO) {
                    println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                    delay(5000L)
                    println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                }
            }
        }

        val cost = System.currentTimeMillis() - s
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
    }
}
![600](https://raw.githubusercontent.com/hacket/ObsidianOSS/master/obsidian/20241209140227.png)
  • 总耗时 7000 多 ms,说明 Task 1 和 Task 2 不能并行执行;先等待第 1 个 supervisorScope 内的协程并行完成,再等到第 2 个 supervisorScope 内的协程并行执行完成,所以耗时是 2 个 supervisorScope 耗时之和
  • 如果有个协程抛出异常,其他的协程不会取消,会继续执行
  • Task1 出现异常后,没有输出堆栈;因为用的是 async,打印堆栈需要等到 await 的调用
launch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private fun test_async() {
    runBlocking {
        val s = System.currentTimeMillis()
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
        delay(500)
        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        supervisorScope {
            launch {
                println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                delay(1500L)
                var s: String? = null
                s!!.length
                println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
            }
        }
        supervisorScope {
            launch {
                withContext(kotlinx.coroutines.Dispatchers.IO) {
                    println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                    delay(5000L)
                    println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                }
            }
        }

        val cost = System.currentTimeMillis() - s
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
    }
}

  • 总耗时 7000 多 ms,说明 Task 1 和 Task 2 不能并行执行;先等待第 1 个 supervisorScope 内的协程并行完成,再等到第 2 个 supervisorScope 内的协程并行执行完成,所以耗时是 2 个 supervisorScope 耗时之和
  • 如果有个协程抛出异常,其他的协程不会取消,会继续执行
  • Task 1 出现异常后,输出了堆栈
不用 launch 和 async
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private fun test_async() {
    runBlocking {
        val s = System.currentTimeMillis()
        println("Task from runBlocking ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
        delay(500)
        println("delay 500ms  ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")

        supervisorScope {
//            launch {
                println("Task1 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                delay(1500L)
                var s: String? = null
                s!!.length
                println("Task1 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
//            }
        }
        supervisorScope {
//            launch {
                withContext(kotlinx.coroutines.Dispatchers.IO) {
                    println("Task2 start ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
                    delay(5000L)
                    println("Task2 end ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
//                }
            }
        }

        val cost = System.currentTimeMillis() - s
        println("launch end, cost=$cost, ${getCurrentFormattedDateTime()} ${Thread.currentThread().name}")
    }
}

  • Task1 和 Task2 不能并行执行

其他协程函数

delay

join

yield

withContext

withContext 接收一个 CoroutineContext,阻塞协程并等待协程返回 T 值。withContext{} 不会创建新的协程,在指定协程上运行挂起代码块,并挂起该协程直至代码块运行完成。

1
2
3
4
5
6
7
fun main() = runBlocking {
    val result = withContext(this.coroutineContext) {
        println("thread name: ${Thread.currentThread().name}")
        1
    }
    println("result: $result")
}

结果:

1
2
thread name: main
result: 1

withTimeout

通过 withTimeout(millisecond) {},可以对协程做超时处理。

正常情况下,withTimeout 等待协程返回值,并阻塞当前协程。当它发起的协程执行时间超过设定值时,会抛出异常 TimeoutCancellationException。

withTimeoutOrNull

可以使用 withTimeoutOrNull 代替 withTimeout。它们的区别在于 withTimeoutOrNull 超时时不会抛出异常,而是返回 Null。

Ref

本文由作者按照 CC BY 4.0 进行授权