文章

05Kotlin协程取消和超时

05Kotlin协程取消和超时

协程取消和超时

协程取消

协程之间的关系

  1. 父协程手动调用 cancel() 或者异常结束,会立即取消它的所有子协程;取消协程作用域会取消它的子协程。
  2. 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成。
  3. 子协程抛出未捕获的异常时,默认情况下会取消其父协程。
  4. 抛出 CancellationException 或者调用 cancel() 只会取消当前协程和子协程,不会取消父协程

协程取消一般使用 cancel() 或 cancelAndJoin() 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun testCancel() = runBlocking {
    val c = launch(Dispatchers.Default) {
        var i = 0
        while (i < 5) {
            println("num ${i++}")
            delay(500)
        }
    }
    delay(1200)
    println("try cancel")
    c.cancelAndJoin()
    println("end")
}

// 输出
num 0
num 1
num 2
try cancel
end

一段协程代码必须协作才能被取消,所有 kotlinx.coroutines 包中的挂起函数都是可被取消的。协程取消时,会检查子协程的取消,并在取消时抛出 CancellationException,CancellationException 被默认处理,不会引发协程抛出异常

取消作用域会取消它的子协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 取消作用域会取消它的子协程
@Test
fun `test scope cancel`() = runBlocking {
    val scope = CoroutineScope(Dispatchers.Default)
    scope.launch {
        delay(1000)
        println("job 1")
    }

    scope.launch {
        delay(1000)
        println("job 2")
    }
    delay(100)
    scope.cancel()
    println("scope cancel")
    delay(1000)
}

输出:

1
scope cancel

被取消的子协程并不会影响其余兄弟协程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 被取消的子协程并不会影响其余兄弟协程
@Test
fun `test brother job`() = runBlocking<Unit> {
    val scope = CoroutineScope(Dispatchers.Default)
    val job1 = scope.launch {
        delay(1000)
        println("job 1")
    }

    val job2 = scope.launch {
        delay(1000)
        println("job 2")
    }
    delay(100)
    job1.cancel()
    println("job1 cancel")
    delay(1000)
}

输出:

1
2
job1 cancel
job 2

协程通过抛出一个特殊的异常 CancellationException 来处理取消操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 协程通过抛出一个特殊的异常 CancellationException 来处理取消操作。
// 所有kotlinx.coroutines中的挂起函数(withContext、delay等)都是可取消的。
@Test
fun `test CancellationException`() = runBlocking<Unit> {
    val job1 = GlobalScope.launch {
        try {
            delay(1000)
            println("job 1")
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }
    delay(100)
    // 在调用 .cancel 时您可以传入一个 CancellationException 实例来提供更多关于本次取消的详细信息
    // 如果您不构建新的 CancellationException 实例将其作为参数传入的话,会创建一个默认的 CancellationException
    job1.cancel(CancellationException("取消"))
    job1.join() // 不加join上面的launch可能不会执行
    // job1.cancelAndJoin()
}

所有 kotlinx.coroutines 中的挂起函数(withContext、delay 等)都是可取消的

协程的取消只是状态的变化,并不会取消协程的实际运算逻辑

协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun testCancelCpu() = runBlocking {
    val c = launch(Dispatchers.Default) {
        var i = 0
        var nextPrintTime = System.currentTimeMillis()
        while (i < 5) { // 占用CPU
            if (System.currentTimeMillis() > nextPrintTime) {
                println("num ${i++}")
                nextPrintTime += 500
            }
        }
    }
    delay(1200)
    println("try cancel")
    c.cancelAndJoin()
    println("end")
}
// 输出
num 0
num 1
num 2
try cancel
num 3
num 4
end

可以看出,在 cancelAndJoin() 之后,由于 while 还在不断占用 CPU,所以还是会继续执行完毕(类似线程的 cancel),针对这种情况,可以使用:

Job.isActived() isActive 是一个可以被使用在 CoroutineScope 中的扩展属性,检查 Job 是否处于活跃状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fun testCancelCpu1() = runBlocking {
   val c = launch(Dispatchers.Default) {
       var i = 0
       var nextPrintTime = System.currentTimeMillis()
       while (i < 5) { // 占用CPU
           if (!isActive) {
               return@launch
           } 
           if (System.currentTimeMillis() > nextPrintTime) {
               println("num ${i++}")
               nextPrintTime += 500
           }
       }
   }
   delay(1200)
   println("try cancel")
   c.cancelAndJoin()
   println("end")
}
// 输出
num 0
num 1
num 2
try cancel
end

ensureActive() 如果 job 处于非活跃状态,这个方法会立即抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun testCancelCpu1() = runBlocking {
   val c = launch(Dispatchers.Default) {
       var i = 0
       var nextPrintTime = System.currentTimeMillis()
       while (i < 5) { // 占用CPU
           ensureActive() 可以使用此句替代判断isActive,若已经调用了cancel,此处会抛出CancellationException
           if (System.currentTimeMillis() > nextPrintTime) {
               println("num ${i++}")
               nextPrintTime += 500
           }
       }
   }
   delay(1200)
   println("try cancel")
   c.cancelAndJoin()
   println("end")
}
// 输出
num 0
num 1
num 2
try cancel
end

yield() 放弃的意思,表现为暂时让出执行权

  • yield() 函数会检查所在协程的状态,如果已经取消,则抛出 CancellationException 予以响应。
  • 它还会尝试出让线程的执行权,给其他协程提供执行机会
  • 如果要处理的任务属于: CPU 密集型;可能会耗尽线程池资源;需要在不向线程池中添加更多线程的前提下允许线程处理其他任务,那么请使用 yield()。
  • 由于 yield 是个 suspend 函数,所以肯定也可以感知到 cancel() 被执行,进而实现协程取消:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun testCancelCpu1() = runBlocking {
   val c = launch(Dispatchers.Default) {
       var i = 0
       var nextPrintTime = System.currentTimeMillis()
       while (i < 5) { // 占用CPU
           if (System.currentTimeMillis() > nextPrintTime) {
               println("num ${i++}")
               nextPrintTime += 500
           }
           yield() // 事实上此处可以替换成任意一个挂起函数以感知cancel
       }
   }
   delay(1200)
   println("try cancel")
   c.cancelAndJoin()
   println("end")
}
// 输出
num 0
num 1
num 2
try cancel
end

协程取消,需要释放文件、数据库等资源

try{}finally{}、NonCancellable

在协程取消,需要释放文件、数据库等资源时,可以在 finaly 中释放:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fun testCancelRelease() = runBlocking {
    val c = launch(Dispatchers.Default) {
        try {
            println("reading from stream")
            delay(3000)
            println("reading end")
        } finally {
            println("finally release stream")
        }
    }
    delay(1000)
    println("try cancel")
    c.cancelAndJoin()
    println("end")
}
// 输出
reading from stream
try cancel
finally release stream
end

特别注意,在 finally 中,调用挂起函数会直接抛出 CancellationException,因为挂起函数都是可取消的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun testCancelRelease() = runBlocking {
    val c = launch(Dispatchers.Default) {
        try {
            println("reading from stream")
            delay(3000)
            println("reading end")
        } finally {
            println("finally release stream")
            delay(2000) // 下面的不会执行了
            println("release end")
        }
    }
    delay(1000)
    println("try cancel")
    c.cancelAndJoin()
    println("end")
}
// 输出
reading from stream
try cancel
finally release stream
end

如果确实需要在 finally 中执行挂起,可以使用 withContext(NonCancellable) {} 执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
fun testCancelRelease() = runBlocking {
    val c = launch(Dispatchers.Default) {
        try {
            println("reading from stream")
            delay(3000)
            println("reading end")
        } finally {
            withContext(NonCancellable) {
                println("finally release stream")
                delay(2000)
                println("release end")
            }
        }
    }
    delay(1000)
    println("try cancel")
    c.cancelAndJoin()
    println("end")
}
// 输出
reading from stream
try cancel
finally release stream
release end
end
  • 处于取消中状态的协程不能够挂起 (运行不能取消的点),当协程被取消后需要调用挂起函数,只需要将该代码放在 NonCancellable CoroutineContext 中
  • 这样会挂起运行中的代码,并保持协程的取消中状态直到任务处理完成

use{}

对于实现了 Closeable 接口的类,如各种 Stream、Buffer 等,可以直接使用 .use{} 实现自动在 finally 中调用 close() 方法。

  • use 函数源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public inline fun <T : Closeable?, R> T.use(block: (T) -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    var exception: Throwable? = null
    try {
        return block(this)
    } catch (e: Throwable) {
        exception = e
        throw e
    } finally {
        when {
            apiVersionIsAtLeast(1, 1, 0) -> this.closeFinally(exception)
            this == null -> {}
            exception == null -> close()
            else ->
                try {
                    close()
                } catch (closeException: Throwable) {
                    // cause.addSuppressed(closeException) // ignored here
                }
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun testCancelRelease() = runBlocking {
    val c = launch(Dispatchers.Default) {
        FileInputStream(File("build.gradle")).use {
            println("reading from stream")
            delay(3000)
            println("reading end")
        }
    }
    delay(1000)
    println("try cancel")
    c.cancelAndJoin()
    println("end")
}
// 输出
reading from stream
try cancel
end

协程 cancel 原理

协程超时

协程库中已经提供来 withTimeout(){…} 挂起函数来实现在超时后自动取消协程。它会在超时后抛出 TimeoutCancellationException,它是 CancellationException 的子类,它是协程结束的正常原因,不会打印堆栈跟踪信息。

withTimeout 超时后抛出 TimeoutCancellationException

1
2
3
4
5
6
7
8
9
10
11
// 超时任务
// 很多情况下取消一个协程的理由是它有可能超时。
@Test
fun `test deal with timeout`() = runBlocking {
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
}

输出:

1
2
3
4
5
6
7
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...

Timed out waiting for 1300 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
	(Coroutine boundary)

withTimeoutOrNull

还有一个 withTimeoutOrNull() { … } 挂起函数在超时后返回 null,而不是抛出一个异常:

1
2
3
4
5
6
7
8
9
10
11
12
// withTimeoutOrNull 通过返回 null 来进行超时操作,从而替代抛出一个异常
@Test
fun `test deal with timeout return null`() = runBlocking {
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // 在它运行得到结果之前取消它
    }
    println("Result is $result")
}

输出:

1
2
3
4
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
本文由作者按照 CC BY 4.0 进行授权