组合与展平
在复杂的并发场景中,我们经常需要处理多个异步数据源的协同工作。Flow 提供了两种处理维度:组合(Composition)用于将多个流的值合并为一个新值;展平(Flattening)用于处理嵌套流(即流的流)的执行时序。
多流组合:Zip 与 Combine
组合算子的核心区别在于它们如何响应上游的发射节奏。
zip:同步对齐
zip 将两个流的元素按索引一一对应地配对。它遵循“木桶原理”:
- 同步性:必须等待两个流都发射了相同索引的元素,才会执行组合逻辑。
- 生命周期:任一上游流结束,组合后的流立即结束。
combine:最新值融合
combine 不要求索引对齐。只要任一上游流发射了新值,它就会获取所有流的最新缓存值进行计算。
kotlin
val nums = (1..3).asFlow().onEach { delay(100) }
val strs = flowOf("一", "二", "三", "四").onEach { delay(200) }
nums.zip(strs) { n, s -> "$n$s" }.collect { println(it) }
// 输出:1一, 2二, 3三 (由于 nums 只有3个,strs 的“四”被丢弃)kotlin
val filter1 = MutableStateFlow("全部")
val filter2 = MutableStateFlow("按时间")
combine(filter1, filter2) { f1, f2 ->
"当前筛选: $f1 + $f2"
}.collect { println(it) }
// 当用户修改任一筛选条件,都会立即触发 UI 刷新展平流:处理 Flow 的嵌套
当一个流发射的值又是另一个流时(例如:根据用户 ID 去请求用户详情流),会出现 Flow<Flow<T>>。展平操作符负责将其降维为 Flow<T>。
flatMapConcat:串行拼接
它会按顺序处理每一个子流。只有当前一个子流彻底完成后,才会开始收集下一个子流。
- 保证顺序:严格遵循上游的发射顺序。
- 性能局限:由于是串行,总耗时是所有子流耗时的总和。
flatMapMerge:并发合并
它会同时收集多个子流,并将它们发射的元素交织在一起。
- 并发执行:不等待前一个流结束,立即启动下一个。
- 并发数控制:可以通过
concurrency参数限制同时运行的子流数量。
kotlin
flowOf(1, 2, 3).flatMapConcat { id ->
flow {
delay(100)
emit("任务 $id 完成")
}
}.collect { println(it) }
// 严格按 1, 2, 3 顺序输出kotlin
flowOf(1, 2, 3).flatMapMerge(concurrency = 2) { id ->
flow {
delay(100)
emit("任务 $id 并发完成")
}
}.collect { println(it) }
// 结果可能交织,且总耗时减少核心进阶:flatMapLatest 的响应式取消
flatMapLatest 是展平操作中最具响应式特性的算子。每当上游发射新值时,它会立即取消之前的子流,并开启新的子流收集。
工业级场景:搜索联想
在搜索框场景下,用户每打一个字都会发射新值。
- 用户输入 "A",启动搜索请求 A。
- 用户在请求 A 还没回来时输入 "AB"。
flatMapLatest立即取消请求 A 的协程,发起请求 AB。- 这样确保了 UI 永远只显示最新输入的查询结果,且避免了过时数据的展示(数据倒灌)。
kotlin
searchQueryFlow
.debounce(300) // 防抖处理
.flatMapLatest { query ->
repository.searchRemote(query) // 自动管理取消逻辑
}
.collect { results ->
showResults(results)
}核心开发准则
- 多条件过滤选 combine:只要涉及多个数据源共同决定一个 UI 状态的情况,
combine几乎是唯一正确的选择。 - 逻辑关联选 zip:只有当两个数据源必须成对出现(如坐标 X 和 Y)且逻辑上一一对应时才使用
zip。 - IO 请求优先考虑 flatMapLatest:在处理“由于用户操作导致的频繁网络请求”时,
flatMapLatest的自动取消机制是防止资源浪费和数据错乱的关键。 - 控制 flatMapMerge 的并发数:默认并发数为 16(
DEFAULT_CONCURRENCY)。如果子流是重量级的(如大型数据库查询),应根据设备性能调低该值。