Skip to content

线程模型与并发性能

源:Flow context

Flow 的线程管理遵循上下文保存 (Context Preservation) 契约。这意味着 Flow 默认不具备多线程能力,必须通过特定的操作符显式切换。

上下文保存契约

契约定义

flow { ... } 内部的执行上下文必须与 collect 时一致。 你不能在 flow { ... } 内部使用 withContext(Dispatchers.IO) 来发射值,这会直接抛出 IllegalStateException

为什么? 因为 Flow 的设计目标是让下游(消费者)拥有完全的控制权。

flowOn:改变上游上下文

flowOn 是唯一正确的线程切换方式。它仅影响在其上方的操作符。

kotlin
flow {
    emit(data) // 运行在 IO
}
.filter { ... } // 运行在 IO
.flowOn(Dispatchers.IO) // ⭐️ 切换点
.map { ... } // 运行在 Main (假设 collect 在 Main)
.collect { ... } // 运行在 Main

flowOn 的底层:Channel 缓冲

当调用 flowOn 时,Kotlin 会在内部启动一个协程,并通过 Channel 将数据从一个线程池传输到另一个。这会自动引入并发性

背压与缓冲策略 (Backpressure)

背压发生在生产者快于消费者时。

1. 默认挂起 (Suspend)

Flow 默认是无缓冲的顺序执行。如果消费者卡住了,生产者会在 emit 处挂起。

2. buffer 显式缓冲

buffer 允许生产者在消费者处理期间继续工作,而不必挂起。

kotlin
flow.buffer(capacity = 10, onBufferOverflow = BufferOverflow.DROP_OLDEST)
策略行为
SUSPEND缓冲区满时挂起生产者
DROP_OLDEST丢弃最老的数据,保留最新的
DROP_LATEST丢弃当前发射的数据

3. conflate (合并)

conflateDROP_OLDEST 且容量为 0 的简化版。它确保下游永远拿到最新的一个值,跳过所有中间值。

并发发射:channelFlow

如果你需要在一个流内部同时运行多个协程并发发射值,必须使用 channelFlow

kotlin
fun concurrentFlow() = channelFlow {
    launch { send(task1()) }
    launch { send(task2()) }
}

核心准则总结

  1. 不要在 flow {} 中使用 withContext:请使用 flowOn
  2. 理解 flowOn 的单向性:它逆流而上作用于上方。
  3. 性能瓶颈通常在于过多的上下文切换:不要在链式调用中写一堆 flowOn
  4. 使用 conflate 处理高频 UI 更新:如滑动时的位置回调。