Skip to content

响应式核心与冷流基础

源:Asynchronous Flow

在传统的异步编程中,挂起函数可以异步地返回单个值。而对于需要异步返回多个值的场景(如实时传感器数据、数据库变更监听),Kotlin 提供了 Flow。它是一个基于协程的异步流,不仅解决了多值返回问题,还通过协程的挂起特性实现了极其优雅的背压机制。

核心接口契约与 API 签名

Flow 的设计极其精简,其整个生态构建在两个核心接口的协作之上。理解这两个接口的挂起本质是掌握 Flow 的第一步。

查看 Flow 与 FlowCollector 核心定义
kotlin
public interface Flow<out T> {
    /**
     * 收集流的核心方法。
     * 它是一个挂起函数,意味着它会挂起当前协程,直到流被彻底处理完毕。
     * 每一个收集动作都会触发流的一次重新执行(冷流特性)。
     */
    public suspend fun collect(collector: FlowCollector<T>)
}

public fun interface FlowCollector<in T> {
    /**
     * 向下游发射数据。
     * emit 也是一个挂起函数。如果下游处理缓慢,emit 会被挂起,
     * 这种挂起机制形成了天然的背压传播。
     */
    public suspend fun emit(value: T)
}

挂起即背压:底层的协作机制

不同于 RxJava 需要显式处理 request(n) 这种复杂的背压协议,Kotlin Flow 利用协程的挂起(Suspension)直接实现了流量控制。

  • 单协程同步性:默认情况下,emitcollect 运行在同一个协程中。当 emit 发射一个值时,它实际上是在挂起并等待 collect 代码块执行完毕。
  • 天然反馈:如果消费者处理数据耗时 1 秒,那么生产者的 emit 也会被挂起 1 秒。这种点对点的同步反馈机制确保了生产者永远不会产生超过消费者处理能力的数据。

冷流的确定性生命周期

Flow 被称为“冷流”,是因为它的执行逻辑是惰性的。

  • 惰性启动:在调用终端操作符(如 collect)之前,flow { ... } 块中的代码完全不会运行。
  • 状态不持久化:流本身不存储数据。它更像是一个执行模板,每次被收集时都会根据模板从头开始运行。
  • 独立上下文:每一个订阅者都拥有独立的执行进度,互不干扰。
kotlin
val coldFlow = flow {
    println("生产者:开始发射数据")
    for (i in 1..3) {
        delay(100) // 模拟异步工作
        emit(i)
    }
}

runBlocking {
    println("第一次收集:")
    coldFlow.collect { println("接收:$it") }
    
    println("第二次收集:")
    coldFlow.collect { println("接收:$it") }
}
kotlin
val resourceFlow = flow {
    val resource = openConnection()
    try {
        emit(resource.readData())
    } finally {
        // ⭐️ 结构化并发确保了即便收集器取消,finally 也会执行
        resource.close()
        println("资源已释放")
    }
}

流构建器深度解析

除了标准的 flow { ... },Kotlin 还提供了针对不同场景优化的构建算子。

  • flowOf:用于创建一个发射固定值集合的流。其内部实现非常轻量,适合将静态数据转为响应式流。
  • asFlow:这是一个扩展函数族,可以将 IterableIteratorSequenceIntArray 等多种集合类型转换为 Flow。
  • channelFlow:当需要在流内部启动多个协程进行并发发射时使用(详见进阶章节)。
kotlin
// 将集合转换为 Flow
val listFlow = listOf("A", "B").asFlow()

// 将区间转换为 Flow
val rangeFlow = (1..10).asFlow()
kotlin
fun getFlow(useCache: Boolean) = flow {
    if (useCache) {
        emit(cache.get())
    } else {
        val data = api.fetch()
        cache.save(data)
        emit(data)
    }
}

线程模型:上下文保存契约

Flow 的设计遵循严苛的 上下文保存(Context Preservation) 契约。

严禁修改上下文

flow { ... } 构建块中,绝对禁止使用 withContext(Dispatchers.IO) 来发射数据。

kotlin
val illegalFlow = flow {
    // ❌ 这样做会导致 IllegalStateException
    kotlinx.coroutines.withContext(Dispatchers.IO) {
        emit("Data") 
    }
}

底层原因:Flow 必须保证发射时的上下文与收集时的上下文一致(除非显式使用 flowOn),以确保下游收集者的调度意图不被上游破坏。这种设计保证了流的操作是透明且可预测的。

知识点深度对比:List vs Sequence vs Flow

特性ListSequenceFlow
计算时机立即计算(Eager)延迟计算(Lazy)延迟计算(Lazy)
执行模型同步阻塞同步阻塞异步挂起
并发支持不支持不支持原生支持 (flowOn)
内存占用随数据量线性增长极低(按需计算)极低(按需计算)
背压机制挂起驱动型背压

核心准则总结

  1. 终端算子驱动:Flow 的代码块仅在 collect 及其变体被调用时激活。
  2. 顺序发射:在同一个 flow { ... } 块中,发射是顺序的,前一个 emit 没处理完,后一个不会开始。
  3. 取消感知:Flow 的发射过程是可取消的。如果收集流的协程被取消,emit 内部会抛出 CancellationException 并停止运行。
  4. 异常传播:流内部抛出的任何未捕获异常都会传播到 collect 端。