协程取消和超时
协程取消
协程之间的关系
- 父协程手动调用 cancel() 或者异常结束,会立即取消它的所有子协程;取消协程作用域会取消它的子协程。
- 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成。
- 子协程抛出未捕获的异常时,默认情况下会取消其父协程。
- 抛出
CancellationException
或者调用 cancel()
只会取消当前协程和子协程,不会取消父协程
协程取消一般使用 cancel() 或 cancelAndJoin() 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| fun testCancel() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
while (i < 5) {
println("num ${i++}")
delay(500)
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
num 0
num 1
num 2
try cancel
end
|
一段协程代码必须协作才能被取消,所有 kotlinx.coroutines 包中的挂起函数都是可被取消的。协程取消时,会检查子协程的取消,并在取消时抛出 CancellationException,CancellationException 被默认处理,不会引发协程抛出异常
取消作用域会取消它的子协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| // 取消作用域会取消它的子协程
@Test
fun `test scope cancel`() = runBlocking {
val scope = CoroutineScope(Dispatchers.Default)
scope.launch {
delay(1000)
println("job 1")
}
scope.launch {
delay(1000)
println("job 2")
}
delay(100)
scope.cancel()
println("scope cancel")
delay(1000)
}
|
输出:
被取消的子协程并不会影响其余兄弟协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| // 被取消的子协程并不会影响其余兄弟协程
@Test
fun `test brother job`() = runBlocking<Unit> {
val scope = CoroutineScope(Dispatchers.Default)
val job1 = scope.launch {
delay(1000)
println("job 1")
}
val job2 = scope.launch {
delay(1000)
println("job 2")
}
delay(100)
job1.cancel()
println("job1 cancel")
delay(1000)
}
|
输出:
协程通过抛出一个特殊的异常 CancellationException 来处理取消操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
| // 协程通过抛出一个特殊的异常 CancellationException 来处理取消操作。
// 所有kotlinx.coroutines中的挂起函数(withContext、delay等)都是可取消的。
@Test
fun `test CancellationException`() = runBlocking<Unit> {
val job1 = GlobalScope.launch {
try {
delay(1000)
println("job 1")
} catch (e: Exception) {
e.printStackTrace()
}
}
delay(100)
// 在调用 .cancel 时您可以传入一个 CancellationException 实例来提供更多关于本次取消的详细信息
// 如果您不构建新的 CancellationException 实例将其作为参数传入的话,会创建一个默认的 CancellationException
job1.cancel(CancellationException("取消"))
job1.join() // 不加join上面的launch可能不会执行
// job1.cancelAndJoin()
}
|
所有 kotlinx.coroutines 中的挂起函数(withContext、delay 等)都是可取消的
协程的取消只是状态的变化,并不会取消协程的实际运算逻辑
协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| fun testCancelCpu() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
var nextPrintTime = System.currentTimeMillis()
while (i < 5) { // 占用CPU
if (System.currentTimeMillis() > nextPrintTime) {
println("num ${i++}")
nextPrintTime += 500
}
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
num 0
num 1
num 2
try cancel
num 3
num 4
end
|
可以看出,在 cancelAndJoin() 之后,由于 while 还在不断占用 CPU,所以还是会继续执行完毕(类似线程的 cancel),针对这种情况,可以使用:
Job.isActived() isActive 是一个可以被使用在 CoroutineScope 中的扩展属性,检查 Job 是否处于活跃状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| fun testCancelCpu1() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
var nextPrintTime = System.currentTimeMillis()
while (i < 5) { // 占用CPU
if (!isActive) {
return@launch
}
if (System.currentTimeMillis() > nextPrintTime) {
println("num ${i++}")
nextPrintTime += 500
}
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
num 0
num 1
num 2
try cancel
end
|
ensureActive() 如果 job 处于非活跃状态,这个方法会立即抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| fun testCancelCpu1() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
var nextPrintTime = System.currentTimeMillis()
while (i < 5) { // 占用CPU
ensureActive() 可以使用此句替代判断isActive,若已经调用了cancel,此处会抛出CancellationException
if (System.currentTimeMillis() > nextPrintTime) {
println("num ${i++}")
nextPrintTime += 500
}
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
num 0
num 1
num 2
try cancel
end
|
yield() 放弃的意思,表现为暂时让出执行权
- yield() 函数会检查所在协程的状态,如果已经取消,则抛出 CancellationException 予以响应。
- 它还会尝试出让线程的执行权,给其他协程提供执行机会
- 如果要处理的任务属于: CPU 密集型;可能会耗尽线程池资源;需要在不向线程池中添加更多线程的前提下允许线程处理其他任务,那么请使用 yield()。
- 由于 yield 是个 suspend 函数,所以肯定也可以感知到 cancel() 被执行,进而实现协程取消:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| fun testCancelCpu1() = runBlocking {
val c = launch(Dispatchers.Default) {
var i = 0
var nextPrintTime = System.currentTimeMillis()
while (i < 5) { // 占用CPU
if (System.currentTimeMillis() > nextPrintTime) {
println("num ${i++}")
nextPrintTime += 500
}
yield() // 事实上此处可以替换成任意一个挂起函数以感知cancel
}
}
delay(1200)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
num 0
num 1
num 2
try cancel
end
|
协程取消,需要释放文件、数据库等资源
try{}finally{}、NonCancellable
在协程取消,需要释放文件、数据库等资源时,可以在 finaly 中释放:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
try {
println("reading from stream")
delay(3000)
println("reading end")
} finally {
println("finally release stream")
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
reading from stream
try cancel
finally release stream
end
|
特别注意,在 finally 中,调用挂起函数会直接抛出 CancellationException,因为挂起函数都是可取消的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
try {
println("reading from stream")
delay(3000)
println("reading end")
} finally {
println("finally release stream")
delay(2000) // 下面的不会执行了
println("release end")
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
reading from stream
try cancel
finally release stream
end
|
如果确实需要在 finally 中执行挂起,可以使用 withContext(NonCancellable) {}
执行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
| fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
try {
println("reading from stream")
delay(3000)
println("reading end")
} finally {
withContext(NonCancellable) {
println("finally release stream")
delay(2000)
println("release end")
}
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
reading from stream
try cancel
finally release stream
release end
end
|
- 处于取消中状态的协程不能够挂起 (运行不能取消的点),当协程被取消后需要调用挂起函数,只需要将该代码放在 NonCancellable CoroutineContext 中
- 这样会挂起运行中的代码,并保持协程的取消中状态直到任务处理完成
use{}
对于实现了 Closeable 接口的类,如各种 Stream、Buffer 等,可以直接使用 .use{}
实现自动在 finally 中调用 close() 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| public inline fun <T : Closeable?, R> T.use(block: (T) -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
when {
apiVersionIsAtLeast(1, 1, 0) -> this.closeFinally(exception)
this == null -> {}
exception == null -> close()
else ->
try {
close()
} catch (closeException: Throwable) {
// cause.addSuppressed(closeException) // ignored here
}
}
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| fun testCancelRelease() = runBlocking {
val c = launch(Dispatchers.Default) {
FileInputStream(File("build.gradle")).use {
println("reading from stream")
delay(3000)
println("reading end")
}
}
delay(1000)
println("try cancel")
c.cancelAndJoin()
println("end")
}
// 输出
reading from stream
try cancel
end
|
协程 cancel 原理
协程超时
协程库中已经提供来 withTimeout(){…}
挂起函数来实现在超时后自动取消协程。它会在超时后抛出 TimeoutCancellationException,它是 CancellationException 的子类,它是协程结束的正常原因,不会打印堆栈跟踪信息。
withTimeout 超时后抛出 TimeoutCancellationException
1
2
3
4
5
6
7
8
9
10
11
| // 超时任务
// 很多情况下取消一个协程的理由是它有可能超时。
@Test
fun `test deal with timeout`() = runBlocking {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
}
|
输出:
1
2
3
4
5
6
7
| I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Timed out waiting for 1300 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
(Coroutine boundary)
|
withTimeoutOrNull
还有一个 withTimeoutOrNull() { … }
挂起函数在超时后返回 null,而不是抛出一个异常:
1
2
3
4
5
6
7
8
9
10
11
12
| // withTimeoutOrNull 通过返回 null 来进行超时操作,从而替代抛出一个异常
@Test
fun `test deal with timeout return null`() = runBlocking {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // 在它运行得到结果之前取消它
}
println("Result is $result")
}
|
输出:
1
2
3
4
| I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null
|