Skip to content

组合与展平

源:Composing multiple flows

在复杂的并发场景中,我们经常需要处理多个异步数据源的协同工作。Flow 提供了两种处理维度:组合(Composition)用于将多个流的值合并为一个新值;展平(Flattening)用于处理嵌套流(即流的流)的执行时序。

多流组合:Zip 与 Combine

组合算子的核心区别在于它们如何响应上游的发射节奏。

zip:同步对齐

zip 将两个流的元素按索引一一对应地配对。它遵循“木桶原理”:

  • 同步性:必须等待两个流都发射了相同索引的元素,才会执行组合逻辑。
  • 生命周期:任一上游流结束,组合后的流立即结束。

combine:最新值融合

combine 不要求索引对齐。只要任一上游流发射了新值,它就会获取所有流的最新缓存值进行计算。

kotlin
val nums = (1..3).asFlow().onEach { delay(100) }
val strs = flowOf("一", "二", "三", "四").onEach { delay(200) }

nums.zip(strs) { n, s -> "$n$s" }.collect { println(it) }
// 输出:1一, 2二, 3三 (由于 nums 只有3个,strs 的“四”被丢弃)
kotlin
val filter1 = MutableStateFlow("全部")
val filter2 = MutableStateFlow("按时间")

combine(filter1, filter2) { f1, f2 -> 
    "当前筛选: $f1 + $f2" 
}.collect { println(it) }
// 当用户修改任一筛选条件,都会立即触发 UI 刷新

展平流:处理 Flow 的嵌套

当一个流发射的值又是另一个流时(例如:根据用户 ID 去请求用户详情流),会出现 Flow<Flow<T>>。展平操作符负责将其降维为 Flow<T>

flatMapConcat:串行拼接

它会按顺序处理每一个子流。只有当前一个子流彻底完成后,才会开始收集下一个子流。

  • 保证顺序:严格遵循上游的发射顺序。
  • 性能局限:由于是串行,总耗时是所有子流耗时的总和。

flatMapMerge:并发合并

它会同时收集多个子流,并将它们发射的元素交织在一起。

  • 并发执行:不等待前一个流结束,立即启动下一个。
  • 并发数控制:可以通过 concurrency 参数限制同时运行的子流数量。
kotlin
flowOf(1, 2, 3).flatMapConcat { id ->
    flow {
        delay(100)
        emit("任务 $id 完成")
    }
}.collect { println(it) }
// 严格按 1, 2, 3 顺序输出
kotlin
flowOf(1, 2, 3).flatMapMerge(concurrency = 2) { id ->
    flow {
        delay(100)
        emit("任务 $id 并发完成")
    }
}.collect { println(it) }
// 结果可能交织,且总耗时减少

核心进阶:flatMapLatest 的响应式取消

flatMapLatest 是展平操作中最具响应式特性的算子。每当上游发射新值时,它会立即取消之前的子流,并开启新的子流收集。

工业级场景:搜索联想

在搜索框场景下,用户每打一个字都会发射新值。

  1. 用户输入 "A",启动搜索请求 A。
  2. 用户在请求 A 还没回来时输入 "AB"。
  3. flatMapLatest 立即取消请求 A 的协程,发起请求 AB。
  4. 这样确保了 UI 永远只显示最新输入的查询结果,且避免了过时数据的展示(数据倒灌)。
kotlin
searchQueryFlow
    .debounce(300) // 防抖处理
    .flatMapLatest { query ->
        repository.searchRemote(query) // 自动管理取消逻辑
    }
    .collect { results ->
        showResults(results)
    }

核心开发准则

  1. 多条件过滤选 combine:只要涉及多个数据源共同决定一个 UI 状态的情况,combine 几乎是唯一正确的选择。
  2. 逻辑关联选 zip:只有当两个数据源必须成对出现(如坐标 X 和 Y)且逻辑上一一对应时才使用 zip
  3. IO 请求优先考虑 flatMapLatest:在处理“由于用户操作导致的频繁网络请求”时,flatMapLatest 的自动取消机制是防止资源浪费和数据错乱的关键。
  4. 控制 flatMapMerge 的并发数:默认并发数为 16(DEFAULT_CONCURRENCY)。如果子流是重量级的(如大型数据库查询),应根据设备性能调低该值。