上下文切换
Flow 的线程管理遵循严苛的上下文保存 (Context Preservation) 契约。与传统的响应式库(如 RxJava)通过操作符改变下游执行线程不同,Flow 的 flowOn 操作符主要用于改变上游的执行环境,而下游的执行上下文则始终由收集器(Collector)所在的协程决定。
上下文保存契约
Flow 的设计哲学要求:流的发射逻辑必须与收集逻辑在同一个上下文中运行。 这种设计保证了流的执行是透明且可预测的,但也带来了一个严格的禁令。
严禁在流内部使用 withContext
在 flow { ... } 构建块中,绝对禁止通过 withContext(Dispatcher) 来强行改变发射时的上下文。
kotlin
val illegalFlow = flow {
// ❌ 这样做会导致 IllegalStateException: Flow invariant is violated
// 因为这破坏了“发射与收集上下文必须一致”的原则
kotlinx.coroutines.withContext(Dispatchers.IO) {
emit("Data")
}
}底层原因:Flow 必须确保下游收集者的调度意图不被上游破坏。如果收集者在主线程等待,而上游擅自切换到了后台线程进行发射,会产生难以调试的并发问题。
flowOn 操作符:逆流而上的切换
flowOn 是 Flow 中唯一合法的上下文切换算子。它的作用方向是逆流而上的:它只影响在其之后、直到上一个 flowOn 或流起点之间的操作符。
- 作用范围:仅影响上游。
- 并发性:调用
flowOn会自动引入缓冲区并开启并发执行。
kotlin
flow {
println("发射阶段: ${Thread.currentThread().name}") // 运行在 IO 线程
emit(1)
}
.filter {
println("过滤阶段: ${Thread.currentThread().name}") // 运行在 IO 线程
it > 0
}
.flowOn(Dispatchers.IO) // ⭐️ 切换点:改变上方所有逻辑的上下文
.map {
println("映射阶段: ${Thread.currentThread().name}") // 运行在 Main (收集者线程)
it * 10
}
.collect {
println("收集阶段: ${Thread.currentThread().name}") // 运行在 Main
}底层原理:基于 Channel 的物理隔离
当你在流中插入 flowOn(Dispatchers.IO) 时,Kotlin 运行时在底层做了以下工作:
- 启动辅助协程:它会在
Dispatchers.IO上启动一个新的协程来专门运行上游逻辑。 - 建立通信管道:它会创建一个
Channel(通道),作为上游协程(生产者)与下游协程(消费者)之间的中转站。 - 引入并发与缓冲:由于有了中转管道,上游不再需要同步等待下游处理完一个元素后再发射下一个。这种物理隔离实现了真正的异步流水线。
隐性背压
因为 flowOn 引入了 Channel,它实际上隐式地调用了 buffer()。这意味着上游发射速度较快时,数据会暂存在 Channel 中。如果 Channel 满了,上游协程依然会根据背压策略挂起。
多个 flowOn 的级联行为
如果一个流中存在多个 flowOn,它们会各自划分自己的势力范围。
kotlin
flow { ... }
.flowOn(Dispatchers.Default) // 1. 作用于此处的代码块
.map { ... }
.flowOn(Dispatchers.IO) // 2. 作用于 map 及其到上一个 flowOn 之间的逻辑
.collect { ... } // 3. 始终运行在 collect 所在的上下文快捷方式:launchIn
launchIn 是一个专门用于在特定上下文中启动流收集的操作符。它本质上是 scope.launch { flow.collect() } 的语法糖,有助于减少代码嵌套并提高可读性。
kotlin
viewModel.uiState
.onEach { updateUI(it) }
.launchIn(viewModelScope) // 在指定作用域启动收集,不阻塞后续代码核心开发准则
- 不要在 Repo 层决定 collect 的线程:Repository 层的 Flow 应该只通过
flowOn确保自己的耗时操作运行在后台。具体的收集线程应该由 UI 层(消费者)根据需求决定。 - 避免过度切换:频繁使用
flowOn会引入大量的Channel开销和协程切换,建议只在确实需要进行 IO 或密集计算的边界处使用一次。 - 理解 Main.immediate:在 Android 中,如果已经处于主线程,使用
Dispatchers.Main.immediate收集流可以避免不必要的分派延迟,实现更流畅的 UI 响应。