背压与缓冲策略
在异步流处理中,生产者(Emitter)与消费者(Collector)的执行速率往往不一致。当生产者发射速度快于消费者的处理速度时,就会产生所谓的“背压(Backpressure)”。Flow 利用协程的挂起机制天然地解决了这一问题,并提供了多种缓冲策略来优化吞吐量。
默认背压行为:顺序挂起
默认情况下,Flow 是串行执行且无缓冲的。
- 逻辑闭环:生产者发射一个值后,会直接在
emit处挂起,直到消费者完整执行完collect闭包中的逻辑。 - 物理表现:生产者与消费者运行在同一个协程中,这意味着它们的总耗时是两者耗时的叠加。
kotlin
val flow = flow {
for (i in 1..3) {
delay(100) // 模拟生产耗时
emit(i)
}
}
measureTimeMillis {
flow.collect { value ->
delay(300) // 模拟消费耗时
println("处理: $value")
}
}.also { println("总耗时: ${it}ms") }
// 理论总耗时 ≈ (100 + 300) * 3 = 1200ms引入缓冲区:buffer
通过调用 buffer() 算子,你可以打破生产者与消费者的串行屏障。
- 并发执行:
buffer会在内部启动一个辅助协程来运行上游逻辑。生产者可以在消费者处理当前数据时,提前准备并缓存后续的数据。 - 背压缓解:只要缓冲区未满,生产者就不会被挂起。
物理实现
buffer 底层使用了一个 Channel。它将 Flow 拆分为两个并发运行的阶段:生产阶段 -> 入队 -> 消费阶段。
溢出策略 (BufferOverflow)
你可以通过参数配置当缓冲区满时的处理逻辑:
| 策略 | 行为描述 | 适用场景 |
|---|---|---|
SUSPEND | 默认策略。缓冲区满时,上游 emit 挂起。 | 关键业务数据,不能丢失。 |
DROP_OLDEST | 丢弃最老的数据(队列头部),腾出空间给新数据。 | 实时性要求高,旧数据无意义。 |
DROP_LATEST | 丢弃当前试图进入缓冲区的新数据。 | 保护系统稳定,防止过载。 |
kotlin
flow.buffer(capacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)数据合并:conflate
conflate(合并)是一种极端的缓冲策略,它相当于 buffer(capacity = 0, onBufferOverflow = DROP_OLDEST)。
- 核心逻辑:消费者处理数据期间,上游发射的所有中间值都会被忽略。当消费者处理完准备接收下一个值时,它只能拿到生产者发射的最新那个值。
- 性能优势:它完全消除了消费者的堆积,保证了处理的实时性。
kotlin
val fastFlow = flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}
fastFlow.conflate().collect { value ->
delay(300) // 处理很慢
println("接收到最新值: $value")
}
// 输出可能跳过中间值,如 1, 4, 7, 10终端截断:collectLatest
collectLatest 与上述所有策略有着本质不同:它不丢弃数据,而是取消消费者。
- 取消机制:每当生产者发射一个新值时,如果之前的
collectLatest闭包还在执行,它会立即被取消(Cancel),然后重新启动并处理新值。 - 数据完整性:上游的每一个值都会触发消费逻辑,但只有发射间隔足够长、或者消费逻辑足够快时,逻辑才能完整运行。
场景:UI 状态刷新
在 Android 中,当用户点击列表项触发详情加载时,如果用户连续点击多项,我们只对最后一次点击感兴趣。使用 collectLatest 可以自动取消之前的无效加载。
核心开发准则
- 不要盲目使用 buffer:无缓冲的串行模式是最省内存且最易调试的。只有在确认生产/消费速率严重不匹配且导致流水线卡顿时,才引入缓冲区。
- 高频数据首选 conflate:在处理传感器(加速度计、陀螺仪)数据或滚动回调时,中间值的丢失通常是可以接受的,此时
conflate能显著降低 CPU 负载。 - 关键任务使用默认 SUSPEND:在执行如数据库同步、文件写入等关键任务时,绝对不能丢失数据,必须使用挂起背压确保每一个元素都被处理。
- 理解 collectLatest 的取消成本:虽然它很好用,但频繁的取消和重启会带来额外的协程切换开销。如果消费闭包内包含重型不可中断任务,取消可能并不会立即生效。