Kotlin Flow操作符
协程之 Flow 操作符
过渡流操作符 Intermediate Operations
可以使用操作符转换流,就像使用集合与序列一样。过渡操作符应用于上游流,并返回下游流。这些操作符也是冷操作符,就像流一样。这类操作符本身不是挂起函数。它运行的速度很快,返回新的转换流的定义。
转换
Transform 转换
- 通用的用了转换每一个 item,可忽略或 emit 多次 item (在使用 transform 操作符时,可以任意多次调用 emit ,这是 transform 跟 map 最大的区别)
- 其他的操作符可通过 transform 来实现,如:
skipOddAndDuplicateEven
1
2
3
4
5
6
7
8
9
10
11
12
13
suspend fun testTransform() {
(1..3).asFlow() // 一个请求流
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
suspend fun performRequest(request: Int): String {
delay(1000) // 模仿长时间运行的异步任务
return "response $request"
}
输出:
1
2
3
4
5
6
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
TransformLatest
TransformWhile 截断流
TransformWhile 的返回值是一个 bool 类型,用来控制流的截断,如果返回 true,则流继续执行,如果 false,则流截断。
1
2
3
4
5
6
7
8
9
10
flow {
for (i in 0..3) {
emit(i)
}
}.transformWhile { value ->
emit(value)
value == 1
}.collect {
Log.d("xys", "Result---$it")
}
Map
同 RxJava 的 map 操作符
1
2
3
4
5
6
7
8
9
10
11
12
suspend fun test_map() {
flow {
for (i in 1..3) {
delay(100) // 假装我们异步等待了 100 毫秒
emit(i) // 发射下一个值
}
}.map {
"map $it"
}.collect {
println(it)
}
}
输出:
1
2
3
map 1
map 2
map 3
过滤
过滤操作符用于过滤流中的数据
Filter[XXX] 过滤(false 就过滤掉)
返回 true 才成功
- Filter
1
2
3
4
5
6
7
8
9
10
11
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
输出:
1
2
3
4
5
6
7
8
9
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
其他类似:
- FilterInstance
- FilterNot
- FilterNotNull
Drop、dropWhile 丢弃前 n 个数据
带 while 后缀的,则表示按条件进行判断。
Take、takeWhile 只取前几个 emit 发射的值
限长过渡操作符(例如 take
)在流触及相应限制的时候会将它的执行取消。协程中的取消操作总是通过抛出异常来执行,这样所有的资源管理函数(如 try {…} finally {…}
块)会在取消的情况下正常运行:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
suspend fun testTake() {
numbers().take(2).collect { println(it) }
}
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
输出:
1
2
3
1
2
Finally in numbers
Debounce
Debounce 操作符用于防抖,指定时间内的值只接收最新的一个。
Sample
Sample 操作符与 debounce 操作符有点像,但是却限制了一个周期性时间,sample 操作符获取的是一个周期内的最新的数据,可以理解为 debounce 操作符增加了周期的限制。
Debounce 和 sample 区别
它们的区别在于倒计时开始的时间点,sample 倒计时开始的点在时间轴上是固定的,比如每一秒的开始,不管这一秒有没有数据产生,倒计时都会进行。Debounce 开始倒计时的点是每一次数据产生的时候,没有数据就没有倒计时。Debounce 发射数据的那一次倒计时中一定只包含要被发射的那一个数据,而对于 sample,倒计时中可能包含多个数据。
DistinctUntilChangedBy
去重操作符,可以按照指定类型的参数进行去重。
组合
Zip 合并(取最短的那个流)
用于组合两个流中的相关值,多余的部分过滤掉,执行合并后新的 flow 的 item 个数 = 较小的 flow 的 item 个数。;如果有 delay,合并过程中也会等待 delay 执行完后再进行合并。
1
2
3
4
5
6
7
8
9
suspend fun test_zip() {
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
输出:
1
2
3
1 -> one at 443 ms from start
2 -> two at 844 ms from start
3 -> three at 1248 ms from start
每次 zip,需要等时间久的 400 ms 才更新;800 ms 更新一次;1200 ms 更新一次。
Combine (每次更新的 item 都和其他的 item 合并)
组合两个流,那个流的值更新了,都取两个流最新的值(使用 combine 合并时,每次从 flowA 发出新的 item ,会将其与 flowB 的最新的 item 合并)
1
2
3
4
5
6
7
8
9
suspend fun test_combine() {
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" } // 使用“combine”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
输出:
1
2
3
4
5
1 -> one at 441 ms from start
2 -> one at 641 ms from start
2 -> two at 845 ms from start
3 -> two at 945 ms from start
3 -> three at 1249 ms from start
首次 combine 需要等 400 ms,数据准备好 (1 和 one) 才能合并数据;第 2 次 nums 流准备好了 2,但 strs 的最新值还是 one,所以 2 和 one 组合了;后续类似。
CombineTransform
Merge 合并多个流
Merge 操作符用于将多个流合并。Merge 的输出结果是按照时间顺序,将多个流依次发射出来。
1
2
3
4
5
6
7
8
9
suspend fun test_merge() {
val nums = (1..4).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
listOf(nums, strs).merge()
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
输出:
1
2
3
4
5
6
7
1 at 367 ms from start
one at 461 ms from start
2 at 671 ms from start
two at 866 ms from start
3 at 974 ms from start
three at 1270 ms from start
4 at 1278 ms from start
Merge 的输出结果是按照时间顺序,将多个流依次发射出来;根据 delay 的时间来输出。
展平流
FlatMapConcat
类似于 RxJava 的 concatMap。FlatMapConcat 由 map、flattenConcat 操作符实现
1
2
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
map(transform).flattenConcat()
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
输出:
1
2
3
4
5
6
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
FlatMapMerge
FlatMapMerge 由 map、flattenMerge 操作符实现。
1
2
3
4
5
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun test_flatMapMerge() {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapMerge {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
输出:
1
2
3
4
5
6
7
8
9
10
1: First at 118 ms from start
2: First at 216 ms from start
3: First at 317 ms from start
4: First at 418 ms from start
5: First at 527 ms from start
1: Second at 623 ms from start
2: Second at 717 ms from start
3: Second at 818 ms from start
4: Second at 923 ms from start
5: Second at 1029 ms from start
FlatMapLatest
当发射了新值之后,上个 flow 就会被取消。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
suspend fun test_flatMapLatest() {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach { delay(100) }
.flatMapLatest {
flow {
emit("$it: First")
delay(500)
emit("$it: Second")
}
}
.collect {
println("$it at ${System.currentTimeMillis() - start} ms from start")
}
}
输出:
1
2
3
4
5
6
1: First at 111 ms from start
2: First at 218 ms from start
3: First at 319 ms from start
4: First at 421 ms from start
5: First at 526 ms from start
5: Second at 1029 ms from start
FlattenConcat
将给定的 Flow 按顺序合并成一个 Flow,不会交错。和 flattenMerge(concurrency = 1)
相等
1
2
3
4
5
6
7
8
9
10
11
12
suspend fun test_flattenConcat() {
val flowA = (1..5).asFlow().onEach { delay(100) } // 5*100=500ms
val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(200) } // 5*200=1000ms
val t = measureTimeMillis {
flowOf(flowA, flowB)
.flattenConcat()
.collect {
println("${LocalDateTime.now()} $it")
}
}
println("Done cost:$t ms.")
}
输出:
1
2
3
4
5
6
7
8
9
10
11
2021-11-02T15:22:14.489 1
2021-11-02T15:22:14.591 2
2021-11-02T15:22:14.692 3
2021-11-02T15:22:14.793 4
2021-11-02T15:22:14.896 5
2021-11-02T15:22:15.100 one
2021-11-02T15:22:15.304 two
2021-11-02T15:22:15.505 three
2021-11-02T15:22:15.708 four
2021-11-02T15:22:15.911 five
Done cost:1609 ms.
FlattenMerge
1
2
3
4
5
6
7
8
9
10
11
12
suspend fun test_flattenMerge() {
val flowA = (1..5).asFlow().onEach { delay(300) } // 5*300=1500ms
val flowB = flowOf("one", "two", "three", "four", "five").onEach { delay(200) } // 5*200=1000ms
val t = measureTimeMillis {
flowOf(flowA, flowB)
.flattenMerge()
.collect {
println("${LocalDateTime.now()} $it")
}
}
println("Done cost:$t ms.")
}
输出:
1
2
3
4
5
6
7
8
9
10
11
2021-11-02T15:29:24.006 one
2021-11-02T15:29:24.027 1
2021-11-02T15:29:24.135 two
2021-11-02T15:29:24.332 2
2021-11-02T15:29:24.336 three
2021-11-02T15:29:24.540 four
2021-11-02T15:29:24.635 3
2021-11-02T15:29:24.744 five
2021-11-02T15:29:24.938 4
2021-11-02T15:29:25.243 5
Done cost:1574 ms.
末端流操作符 Terminal Operations
末端操作符是在流上用于启动流收集的挂起函数。
Flow 的 Terminal 运算符可以是 suspend 函数,如 collect
、single
、reduce
、toList
等;也可以是非 suspend 的 launchIn
运算符,用于在指定 CoroutineScope 内使用 flow
1
2
3
4
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
Collect[XXX]
Collect 收集
CollectIndexed
带下标的 collect,下标是 Flow 中的 emit 顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
MainScope().launch {
val time = measureTimeMillis {
flow {
for (i in 0..3) {
Log.d("xys", "emit value---$i")
emit(i.toString())
}
}.collectIndexed { index, value ->
Log.d("xys", "Result in $index --- $value")
}
}
Log.d("xys", "Time---$time")
}
CollectLatest 用于在 collect 中取消未来得及处理的数据,只保留当前最新的生产数据
当发射器和收集器都很慢的时候,合并是加快处理速度的一种方式。它通过删除发射值来实现。另一种方式是取消缓慢的收集器,并在每次发射新值的时候重新启动它。有一组与 xxx 操作符执行相同基本逻辑的 xxxLatest 操作符,但是在新值产生的时候取消执行其块中的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // 假装我们异步等待了 100 毫秒
emit(i) // 发射下一个值
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 取消并重新发射最后一个值
println("Collecting $value")
delay(300) // 假装我们花费 300 毫秒来处理它
println("Done $value")
}
}
println("Collected in $time ms")
}
输出:
1
2
3
4
5
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 741 ms
由于 collectLatest 的函数体需要花费 300 毫秒,但是新值每 100 秒发射一次,我们看到该代码块对每个值运行,但是只收集最后一个值
Reduce 累计通过 operation
类似于 Kotlin 集合中的 reduce 函数,能够对集合进行计算操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S {
var accumulator: Any? = NULL
collect { value ->
accumulator = if (accumulator !== NULL) {
@Suppress("UNCHECKED_CAST")
operation(accumulator as S, value)
} else {
value
}
}
if (accumulator === NULL) throw UnsupportedOperationException("Empty flow can't be reduced")
@Suppress("UNCHECKED_CAST")
return accumulator as S
}
对平方数列求和案例:
1
2
3
4
5
6
7
suspend fun test_reduce() {
val sum = (1..5).asFlow()
.map { it * it } // 数字 1 至 5 的平方
.reduce { a, b -> a + b } // 求和(末端操作符)
// 1*1+2*2+3*3+4*4+5*5 = 1+4+9+16+25 = 55
println(sum) // 55
}
计算阶乘:
1
2
3
4
fun main() = runBlocking {
val sum = (1..5).asFlow().reduce { a, b -> a * b }
println(sum)
}
Fold
也类似于 Kotlin 集合中的 fold 函数,fold 也需要设置初始值。
1
2
3
4
5
6
7
8
9
10
public suspend inline fun <T, R> Flow<T>.fold(
initial: R,
crossinline operation: suspend (acc: R, value: T) -> R
): R {
var accumulator = initial
collect { value ->
accumulator = operation(accumulator, value)
}
return accumulator
}
案例:
1
2
3
4
5
6
7
fun main() = runBlocking {
val sum = (1..5).asFlow()
.map { it * it }
.fold(0) { a, b -> a + b }
println(sum)
}
// 初始值为0就类似于使用 reduce 函数实现对平方数列求和。
计算阶乘:
1
2
3
4
fun main() = runBlocking {
val sum = (1..5).asFlow().fold(1) { a, b -> a * b }
println(sum)
}
LaunchIn 在指定的协程作用域中直接执行 Flow
1
2
3
4
5
6
flow {
for (i in 0..3) {
Log.d("xys", "emit value---$i")
emit(i.toString())
}
}.launchIn(MainScope())
其他末端操作符
First/single 获取第一个(first)值与确保流发射单个(single)值的操作符
ToList/toSet/toCollection 将 Flow 转换为 Collection、Set 和 List
Last、lastOrNull、first、firstOrNull
返回 Flow 的最后一个值(第一个值),区别是 last 为空的话,last 会抛出异常,而 lastOrNull 可空
1
2
3
4
5
flow {
for (i in 0..3) {
emit(i.toString())
}
}.last()
- Count
- Fold 将流规约到单个值
- LaunchIn/produceIn/broadcastIn
线程切换操作符 flowOn
- FlowOn 只能控制上游各操作符执行在哪个线程,控制 collect 执行所在线程
- Collect 在哪个线程取决整个 flow 处于哪个线程下
1
2
3
4
5
6
7
8
9
10
11
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println(it)
}
- Flow builder 和 map 操作符都会受到 flowOn
- Collect () 指定哪个线程,则需要看整个 flow 处于哪个 CoroutineScope 下
注意: 不要使用 withContext () 来切换 flow 的线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@OptIn(InternalCoroutinesApi::class)
suspend fun test2() {
flow {
emit(1) // Ok
// withContext(Dispatchers.IO) {
// // Flow invariant is violated: Please refer to 'flow' documentation or use 'flowOn' instead
// emit(2) // Will fail with ISE
// }
}.collect(object : FlowCollector<Int> {
override suspend fun emit(value: Int) {
println(value)
}
})
}