Skip to content

进阶变换

源:Intermediate flow operators

进阶变换算子超越了简单的元素映射,它们允许 Flow 在执行过程中维护内部状态、拦截生命周期事件以及根据动态条件控制流的走向。掌握这些算子是构建复杂响应式系统的关键。

状态累积算子:Scan 与 Running 系列

在处理如“股票价格走势”、“下载进度叠加”等场景时,我们需要在发射当前元素时,结合之前所有元素的累积状态。

scan 与 runningFold

scan(又名 runningFold)是终端算子 fold 的中间镜像。

  • scan(initial) { acc, value -> ... }
    • 初值发射:它会首先发射 initial 值,然后再发射后续的累积结果。
    • 应用场景:UI 列表的初始空状态展示。

runningReduce

scan 不同,runningReduce 不需要初始值,它直接使用上游发射的第一个元素作为累加器的起始点。

kotlin
flowOf(10, 20, 30) // 发射每一块下载的大小
    .scan(0) { acc, value -> acc + value }
    .collect { println("当前总大小: $it MB") }
// 输出:0 (初始状态), 10, 30, 60
kotlin
flowOf("A", "B", "C")
    .runningReduce { acc, value -> "$acc-$value" }
    .collect { println(it) }
// 输出:A, A-B, A-B-C (不发射额外的初始值)

生命周期钩子与副作用拦截

Flow 提供了一系列算子来拦截流执行的关键节点。理解这些钩子的精确执行时序至关重要。

onStart:预热与初始化

在流开始收集时(甚至在第一个元素发射前)触发。

  • 多处调用:如果流中有多个 onStart,它们将按照从下往上(逆流而上)的顺序执行。
  • 发射能力onStart 允许你调用 emit 发射额外的初始元素(如 Loading 状态)。

onCompletion:清理与状态感知

在流正常结束或因异常取消时触发。

  • 原因感知:回调中包含一个可选的 Throwable,用于判定流是成功完成还是由于异常崩溃。
  • 与 finally 对比:它比 try-finally 更具声明式风格,且能感知到上游的具体错误。

onEmpty:处理空数据流

当上游流在未发射任何元素的情况下关闭时触发。常用于提供默认占位数据。

kotlin
flow {
    emit("数据点")
}
    .onStart { println("1. 准备连接数据库") }
    .onEmpty { emit("占位数据") }
    .onCompletion { cause ->
        if (cause != null) println("2. 发生异常,释放资源")
        else println("2. 正常结束,释放资源")
    }
    .collect { println("接收: $it") }

复杂条件控制算子

takeWhile 与 dropWhile

这两者与 filter 的本质区别在于它们具有“短路”效应。

  • takeWhile:一旦谓词返回 false,立即取消上游流,后续元素即使满足条件也永远不会被处理。
  • dropWhile:丢弃前面的元素直到谓词返回 false,此后流变为“完全放行”状态。
kotlin
val f = (1..10).asFlow()

// filter: 检查所有 10 个元素
f.filter { it < 3 }.collect { println(it) } // 1, 2

// takeWhile: 遇到 3 时直接取消流
f.takeWhile { it < 3 }.collect { println(it) } // 1, 2 (更高效)

去重:distinctUntilChanged

distinctUntilChanged 用于过滤掉与上一个发射值相同的重复项。它本质上是空间维度上的限流。

kotlin
flowOf(1, 1, 2, 2, 1)
    .distinctUntilChanged()
    .collect { println(it) }
// 输出: 1, 2, 1 (过滤连续重复)
kotlin
data class Task(val id: Int, val status: String)

taskFlow.distinctUntilChangedBy { it.id }
    .collect {
        // 只有 ID 变化时才触发,状态变化被忽略
    }

核心开发准则

  1. 状态一致性:使用 scan 时,确保累加操作是幂等的且不依赖外部可变状态,否则在多订阅者场景下会导致结果不可预测。
  2. 区分 onCompletion 与 catchonCompletion 仅仅是感知结束,它不会捕获或消耗异常。如果需要拦截异常并防止程序崩溃,必须紧跟 catch 操作符。
  3. 利用 onEmpty 提升鲁棒性:在搜索或筛选场景中,通过 onEmpty 发射“无结果”状态,比在 UI 层判断 List 是否为空更具响应式风格。
  4. 性能优化:在处理长序列时,优先考虑 takeWhile 而非 filter,以尽早释放上游资源。