基础操作符
Flow 的强大之处在于其丰富的操作符链。这些操作符被分为中间操作符(Intermediate Operators)和终端操作符(Terminal Operators)。中间操作符负责定义数据的加工流水线,而终端操作符则是驱动整个流水线运转的“发动机”。
操作符的分层架构
Flow 的操作符设计遵循装饰器模式。每一个中间操作符都会返回一个新的 Flow 对象,该对象内部包裹了上游的 Flow,并在 collect 被调用时,将自己的逻辑注入到数据流转过程中。
- 惰性求值:中间操作符本身不触发任何计算,它们只是逻辑的声明。
- 顺序流转:默认情况下,流中的每一个元素都会按照操作符链的顺序,完整地经过每一个算子处理后,再处理下一个元素。
中间变换算子
中间算子用于对流中的数据进行过滤、转换或装饰。
map 与 filter
这是最基础的组合。map 执行 1:1 的数据转换,而 filter 执行布尔谓词过滤。
kotlin
(1..5).asFlow()
.filter { it % 2 == 0 } // 只保留偶数
.map { "Item $it" } // 转换为字符串
.collect { println(it) }kotlin
flowOf(1, 2)
.onEach { println("发射: $it") }
.map { it * 10 }
.collect { println("接收: $it") }
// 输出顺序:发射 1 -> 接收 10 -> 发射 2 -> 接收 20
// 证明了 Flow 是顺序流转而非批量处理核心原语:transform 的深度应用
在底层实现中,许多算子(如 map 和 filter)其实都是 transform 的特化版本。transform 是最通用的变换算子。
- 高度自由:它允许你在接收到一个上游值时,调用多次
emit发射多个值,或者根据条件完全不调用emit。 - 异步支持:在
transform内部可以调用任意挂起函数。
kotlin
flowOf(1, 2, 3).transform { value ->
if (value > 1) { // 模拟 filter
emit("Data: $value") // 模拟 map
emit("Extra: $value") // 甚至可以发射多条数据
}
}.collect { println(it) }kotlin
flowOf("id_1", "id_2").transform { id ->
emit(LoadingState)
val detail = api.fetchDetail(id) // 挂起调用
emit(SuccessState(detail))
}.collect { state ->
updateUI(state)
}限定算子与取消机制
限定操作符(如 take)用于在满足特定条件时截断流。
take(n):只取前 n 个元素。- 底层原理:当发射第 n 个元素后,
take会抛出一个内部的异常来取消协程。由于 Flow 是协作式的,这种取消是非常轻量且高效的。
取消的确定性
如果流内部正在执行一个复杂的非挂起循环且没有检查 isActive,那么即使使用了 take(1),内部循环可能依然会运行完毕。
终端聚合算子
终端算子是触发流收集的唯一方式。它们通常是挂起函数。
集合与单个元素
toList/toSet:将所有发射的值收集到集合中。first():获取第一个值并立即取消流。如果流为空,抛出NoSuchElementException。single():期望流只有一个值。如果流为空或多于一个值,抛出异常。
算术归约:reduce 与 fold
用于将流序列聚合为单个结果。
| 算子 | 初始值 | 描述 |
|---|---|---|
reduce | 无 | 使用流的第一个元素作为累加器的初始值。 |
fold | 必须提供 | 使用提供的初始值开始累加,适用于空流。 |
kotlin
// 计算阶乘
val factorial = (1..5).asFlow()
.reduce { acc, value -> acc * value }
println(factorial) // 120kotlin
// 拼接字符串,带初始前缀
val result = (1..3).asFlow()
.fold("Sequence:") { acc, value -> "$acc $value" }
println(result) // Sequence: 1 2 3顺序流转机制与性能考量
Flow 的默认行为是串行的。这意味着每一个元素在流经所有操作符并被 collect 闭包处理完之前,下一个元素不会被发射。
性能提示
这种串行机制避免了复杂的线程同步开销。如果你发现处理速度慢,通常是因为 collect 端的逻辑过于耗时。此时不应在 map 中寻找优化,而应考虑引入 buffer() 或 flowOn()(见进阶章节)来引入并发。
核心开发准则
- 优先使用 transform 实现复杂逻辑:如果你发现需要连续调用多个
map和filter且逻辑紧密相关,合并为一个transform往往能减少闭包创建的开销。 - 终端算子之后流即失效:终端算子执行完毕后,流的生命周期即结束。如果需要再次使用,必须重新获取 Flow 实例。
- 不要在中间算子捕获异常:中间算子应保持逻辑纯粹,异常捕获应交给
catch操作符(见安全章节)。