Skip to content

背压与缓冲策略

源:Buffering

在异步流处理中,生产者(Emitter)与消费者(Collector)的执行速率往往不一致。当生产者发射速度快于消费者的处理速度时,就会产生所谓的“背压(Backpressure)”。Flow 利用协程的挂起机制天然地解决了这一问题,并提供了多种缓冲策略来优化吞吐量。

默认背压行为:顺序挂起

默认情况下,Flow 是串行执行且无缓冲的。

  • 逻辑闭环:生产者发射一个值后,会直接在 emit 处挂起,直到消费者完整执行完 collect 闭包中的逻辑。
  • 物理表现:生产者与消费者运行在同一个协程中,这意味着它们的总耗时是两者耗时的叠加。
kotlin
val flow = flow {
    for (i in 1..3) {
        delay(100) // 模拟生产耗时
        emit(i)
    }
}

measureTimeMillis {
    flow.collect { value ->
        delay(300) // 模拟消费耗时
        println("处理: $value")
    }
}.also { println("总耗时: ${it}ms") }
// 理论总耗时 ≈ (100 + 300) * 3 = 1200ms

引入缓冲区:buffer

通过调用 buffer() 算子,你可以打破生产者与消费者的串行屏障。

  • 并发执行buffer 会在内部启动一个辅助协程来运行上游逻辑。生产者可以在消费者处理当前数据时,提前准备并缓存后续的数据。
  • 背压缓解:只要缓冲区未满,生产者就不会被挂起。

物理实现

buffer 底层使用了一个 Channel。它将 Flow 拆分为两个并发运行的阶段:生产阶段 -> 入队 -> 消费阶段。

溢出策略 (BufferOverflow)

你可以通过参数配置当缓冲区满时的处理逻辑:

策略行为描述适用场景
SUSPEND默认策略。缓冲区满时,上游 emit 挂起。关键业务数据,不能丢失。
DROP_OLDEST丢弃最老的数据(队列头部),腾出空间给新数据。实时性要求高,旧数据无意义。
DROP_LATEST丢弃当前试图进入缓冲区的新数据。保护系统稳定,防止过载。
kotlin
flow.buffer(capacity = 64, onBufferOverflow = BufferOverflow.DROP_OLDEST)

数据合并:conflate

conflate(合并)是一种极端的缓冲策略,它相当于 buffer(capacity = 0, onBufferOverflow = DROP_OLDEST)

  • 核心逻辑:消费者处理数据期间,上游发射的所有中间值都会被忽略。当消费者处理完准备接收下一个值时,它只能拿到生产者发射的最新那个值。
  • 性能优势:它完全消除了消费者的堆积,保证了处理的实时性。
kotlin
val fastFlow = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}

fastFlow.conflate().collect { value ->
    delay(300) // 处理很慢
    println("接收到最新值: $value")
}
// 输出可能跳过中间值,如 1, 4, 7, 10

终端截断:collectLatest

collectLatest 与上述所有策略有着本质不同:它不丢弃数据,而是取消消费者。

  • 取消机制:每当生产者发射一个新值时,如果之前的 collectLatest 闭包还在执行,它会立即被取消(Cancel),然后重新启动并处理新值。
  • 数据完整性:上游的每一个值都会触发消费逻辑,但只有发射间隔足够长、或者消费逻辑足够快时,逻辑才能完整运行。

场景:UI 状态刷新

在 Android 中,当用户点击列表项触发详情加载时,如果用户连续点击多项,我们只对最后一次点击感兴趣。使用 collectLatest 可以自动取消之前的无效加载。

核心开发准则

  1. 不要盲目使用 buffer:无缓冲的串行模式是最省内存且最易调试的。只有在确认生产/消费速率严重不匹配且导致流水线卡顿时,才引入缓冲区。
  2. 高频数据首选 conflate:在处理传感器(加速度计、陀螺仪)数据或滚动回调时,中间值的丢失通常是可以接受的,此时 conflate 能显著降低 CPU 负载。
  3. 关键任务使用默认 SUSPEND:在执行如数据库同步、文件写入等关键任务时,绝对不能丢失数据,必须使用挂起背压确保每一个元素都被处理。
  4. 理解 collectLatest 的取消成本:虽然它很好用,但频繁的取消和重启会带来额外的协程切换开销。如果消费闭包内包含重型不可中断任务,取消可能并不会立即生效。