Skip to content

基础操作符

源:Intermediate flow operators

Flow 的强大之处在于其丰富的操作符链。这些操作符被分为中间操作符(Intermediate Operators)终端操作符(Terminal Operators)。中间操作符负责定义数据的加工流水线,而终端操作符则是驱动整个流水线运转的“发动机”。

操作符的分层架构

Flow 的操作符设计遵循装饰器模式。每一个中间操作符都会返回一个新的 Flow 对象,该对象内部包裹了上游的 Flow,并在 collect 被调用时,将自己的逻辑注入到数据流转过程中。

  • 惰性求值:中间操作符本身不触发任何计算,它们只是逻辑的声明。
  • 顺序流转:默认情况下,流中的每一个元素都会按照操作符链的顺序,完整地经过每一个算子处理后,再处理下一个元素。

中间变换算子

中间算子用于对流中的数据进行过滤、转换或装饰。

map 与 filter

这是最基础的组合。map 执行 1:1 的数据转换,而 filter 执行布尔谓词过滤。

kotlin
(1..5).asFlow()
    .filter { it % 2 == 0 } // 只保留偶数
    .map { "Item $it" }     // 转换为字符串
    .collect { println(it) }
kotlin
flowOf(1, 2)
    .onEach { println("发射: $it") }
    .map { it * 10 }
    .collect { println("接收: $it") }
// 输出顺序:发射 1 -> 接收 10 -> 发射 2 -> 接收 20
// 证明了 Flow 是顺序流转而非批量处理

核心原语:transform 的深度应用

在底层实现中,许多算子(如 mapfilter)其实都是 transform 的特化版本。transform 是最通用的变换算子。

  • 高度自由:它允许你在接收到一个上游值时,调用多次 emit 发射多个值,或者根据条件完全不调用 emit
  • 异步支持:在 transform 内部可以调用任意挂起函数。
kotlin
flowOf(1, 2, 3).transform { value ->
    if (value > 1) { // 模拟 filter
        emit("Data: $value") // 模拟 map
        emit("Extra: $value") // 甚至可以发射多条数据
    }
}.collect { println(it) }
kotlin
flowOf("id_1", "id_2").transform { id ->
    emit(LoadingState)
    val detail = api.fetchDetail(id) // 挂起调用
    emit(SuccessState(detail))
}.collect { state ->
    updateUI(state)
}

限定算子与取消机制

限定操作符(如 take)用于在满足特定条件时截断流。

  • take(n):只取前 n 个元素。
  • 底层原理:当发射第 n 个元素后,take 会抛出一个内部的异常来取消协程。由于 Flow 是协作式的,这种取消是非常轻量且高效的。

取消的确定性

如果流内部正在执行一个复杂的非挂起循环且没有检查 isActive,那么即使使用了 take(1),内部循环可能依然会运行完毕。

终端聚合算子

终端算子是触发流收集的唯一方式。它们通常是挂起函数。

集合与单个元素

  • toList / toSet:将所有发射的值收集到集合中。
  • first():获取第一个值并立即取消流。如果流为空,抛出 NoSuchElementException
  • single():期望流只有一个值。如果流为空或多于一个值,抛出异常。

算术归约:reduce 与 fold

用于将流序列聚合为单个结果。

算子初始值描述
reduce使用流的第一个元素作为累加器的初始值。
fold必须提供使用提供的初始值开始累加,适用于空流。
kotlin
// 计算阶乘
val factorial = (1..5).asFlow()
    .reduce { acc, value -> acc * value }
println(factorial) // 120
kotlin
// 拼接字符串,带初始前缀
val result = (1..3).asFlow()
    .fold("Sequence:") { acc, value -> "$acc $value" }
println(result) // Sequence: 1 2 3

顺序流转机制与性能考量

Flow 的默认行为是串行的。这意味着每一个元素在流经所有操作符并被 collect 闭包处理完之前,下一个元素不会被发射。

性能提示

这种串行机制避免了复杂的线程同步开销。如果你发现处理速度慢,通常是因为 collect 端的逻辑过于耗时。此时不应在 map 中寻找优化,而应考虑引入 buffer()flowOn()(见进阶章节)来引入并发。

核心开发准则

  1. 优先使用 transform 实现复杂逻辑:如果你发现需要连续调用多个 mapfilter 且逻辑紧密相关,合并为一个 transform 往往能减少闭包创建的开销。
  2. 终端算子之后流即失效:终端算子执行完毕后,流的生命周期即结束。如果需要再次使用,必须重新获取 Flow 实例。
  3. 不要在中间算子捕获异常:中间算子应保持逻辑纯粹,异常捕获应交给 catch 操作符(见安全章节)。