文章

06Kotlin协程并发安全

06Kotlin协程并发安全

协程并发安全

不安全的并发访问

使用线程在解决并发问题的时候总是会遇到线程安全的问题,而 Java 平台上的 Kotlin 协程实现免不了存在并发调度的情况,因此线程安全同样值得留意。

1
2
3
4
5
6
7
8
@Test
fun `test not safe concurrent`() = runBlocking<Unit> {
    var count = 0
    List(1000) {
        GlobalScope.launch { count++ }
    }.joinAll()
    println(count)
}

输出:

1
995

启动 1000 个协程对 count 进行加一操作,运行多次,可能每次结果都不一样,有的协程拿到 count 后加操作还没完成,就被别的协程进行加操作了,存在了线程并发问题

解决:用 AtomicInteger 替代 Int

1
2
3
4
5
6
7
8
@Test
fun `test safe concurrent`() = runBlocking<Unit> {
    var count = AtomicInteger(0)
    List(1000) {
        GlobalScope.launch { count.incrementAndGet() }
    }.joinAll()
    println(count.get())
}

输出:

1
1000

协程的并发工具

除了在线程中常用的解决并发问题的手段外,协程框架也提供了一些并发安全地工具,包括:

  1. Channel:并发安全地消息通道。
  2. Mutex:轻量级锁,它的 lockunlock 从语义上与线程锁比较类似,之所以轻量级是因为它在获取不到锁时不会阻塞线程,而是挂起等待锁的释放。
  3. Semaphore:轻量级信号量,信号量可以有多个,协程在获取到信号量后即可执行并发操作。当 Semaphore 的参数为 1 时,效果等价于 Mutex。

Mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
fun `test safe concurrent tools`() = runBlocking<Unit> {
    var count = 0
    val mutex = Mutex()
    List(1000) {
        GlobalScope.launch {
            mutex.withLock {
                count++
            }
        }
    }.joinAll()
    println(count)
}

输出:

1
1000

Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
fun `test safe concurrent tools2`() = runBlocking<Unit> {
    var count = 0
    val semaphore = Semaphore(1)
    List(1000) {
        GlobalScope.launch {
            semaphore.withPermit {
                count++
            }
        }
    }.joinAll()
    println(count)
}

输出:

1
1000

避免访问外部可变状态

编写函数的时候,要求它不得访问外部状态,只能基于参数做运算,通过返回值提供运算结果。

1
2
3
4
5
6
7
8
@Test
fun `test avoid access outer variable`() = runBlocking<Unit> {
    var count = 0
    val result = count + List(1000) {
        GlobalScope.async { 1 }
    }.map { it.await() }.sum()
    println(result)
}

把 count 作为外部变量,和协程隔开,也就不存在并发的安全性问题了

本文由作者按照 CC BY 4.0 进行授权