线程模型与并发性能
Flow 的线程管理遵循上下文保存 (Context Preservation) 契约。这意味着 Flow 默认不具备多线程能力,必须通过特定的操作符显式切换。
上下文保存契约
契约定义
flow { ... } 内部的执行上下文必须与 collect 时一致。 你不能在 flow { ... } 内部使用 withContext(Dispatchers.IO) 来发射值,这会直接抛出 IllegalStateException。
为什么? 因为 Flow 的设计目标是让下游(消费者)拥有完全的控制权。
flowOn:改变上游上下文
flowOn 是唯一正确的线程切换方式。它仅影响在其上方的操作符。
kotlin
flow {
emit(data) // 运行在 IO
}
.filter { ... } // 运行在 IO
.flowOn(Dispatchers.IO) // ⭐️ 切换点
.map { ... } // 运行在 Main (假设 collect 在 Main)
.collect { ... } // 运行在 MainflowOn 的底层:Channel 缓冲
当调用 flowOn 时,Kotlin 会在内部启动一个协程,并通过 Channel 将数据从一个线程池传输到另一个。这会自动引入并发性。
背压与缓冲策略 (Backpressure)
背压发生在生产者快于消费者时。
1. 默认挂起 (Suspend)
Flow 默认是无缓冲的顺序执行。如果消费者卡住了,生产者会在 emit 处挂起。
2. buffer 显式缓冲
buffer 允许生产者在消费者处理期间继续工作,而不必挂起。
kotlin
flow.buffer(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)| 策略 | 行为 |
|---|---|
SUSPEND | 缓冲区满时挂起生产者 |
DROP_OLDEST | 丢弃最老的数据,保留最新的 |
DROP_LATEST | 丢弃当前发射的数据 |
3. conflate (合并)
conflate 是 DROP_OLDEST 且容量为 0 的简化版。它确保下游永远拿到最新的一个值,跳过所有中间值。
并发发射:channelFlow
如果你需要在一个流内部同时运行多个协程并发发射值,必须使用 channelFlow。
kotlin
fun concurrentFlow() = channelFlow {
launch { send(task1()) }
launch { send(task2()) }
}核心准则总结
- 不要在 flow {} 中使用 withContext:请使用
flowOn。 - 理解 flowOn 的单向性:它逆流而上作用于上方。
- 性能瓶颈通常在于过多的上下文切换:不要在链式调用中写一堆
flowOn。 - 使用 conflate 处理高频 UI 更新:如滑动时的位置回调。