Skip to content

Channel 管道与热流原语

源:Channels

Channel 是协程之间进行并发通信的基础原语。与 Flow 不同,Channel 提供的是一种热(Hot)通信管道,它不关注是否有订阅者,数据一旦被发送就会进入管道。它实现了经典的 CSP (Communicating Sequential Processes) 模型:通过通信来共享内存,而不是通过共享内存来通信。

Channel 与 Flow 的本质区别

理解 Channel 的关键在于识别它与 Flow 在数据分发逻辑上的差异:

特性FlowChannel
流类型冷流 (Cold)热流 (Hot)
分发模型一对多 (广播),每个订阅者拿到全量数据一对一 (竞争),每个元素只能被一个消费者接收一次
生命周期随收集器启动和停止独立存在,手动关闭
应用场景状态流转、数据变换任务分发、事件总线原语

核心操作:Send 与 Receive

Channel 接口由 SendChannelReceiveChannel 组合而成。这两个接口的挂起特性是实现非阻塞背压的关键。

kotlin
val channel = Channel<Int>()
launch {
    // send 是挂起函数,缓冲区满时会挂起生产者
    for (x in 1..5) channel.send(x * x)
    channel.close() // 发送完毕必须关闭,否则消费者迭代器会永不结束
}

launch {
    // 消费者通过迭代器或 receive 挂起接收
    for (y in channel) println("接收: $y")
    println("管道已彻底关闭")
}
kotlin
// 推荐方式:produce 协程构建器会自动处理异常和管道关闭逻辑
val squares = produce {
    for (x in 1..5) send(x * x)
}

squares.consumeEach { println(it) }

管道容量与缓冲策略

创建 Channel 时可以指定缓冲区大小,这直接决定了生产者的挂起时机。

  • RENDEZVOUS (0):默认策略。生产者必须等到消费者准备好接收,数据才算发送成功。
  • BUFFERED:拥有固定大小的缓冲区。缓冲区未满时发送不挂起。
  • UNLIMITED:无限缓冲区。发送者永不挂起,直到耗尽内存。
  • CONFLATED:缓冲区为 1。新数据总是覆盖旧数据,发送者永不挂起(类似于 StateFlow 的某些行为)。

并发分发模式

扇出 (Fan-out)

多个协程可以监听同一个 Channel。在这种模式下,数据是以竞争方式分发的:一个元素只会被其中一个协程处理。这对于分发耗时的后台任务非常高效。

公平性 (Fairness)

Channel 在分发数据时是公平的。它内部维护了一个队列,多个协程调用 receive 时遵循 FIFO(先进先出)原则,即最早挂起等待的协程会优先获得下一个元素。

扇入 (Fan-in)

多个协程可以同时向一个 Channel 发送数据。例如,多个传感器的采集任务可以将结果统一汇集到一个处理管道中。

管道流水线 (Pipelines)

通过将多个 produce 协程串联,可以构建强大的数据处理流水线。

kotlin
fun CoroutineScope.numbersProducer() = produce {
    var x = 1
    while (true) send(x++)
}

fun CoroutineScope.squareTransformer(numbers: ReceiveChannel<Int>) = produce {
    for (x in numbers) send(x * x)
}

// 组合流水线
val numbers = numbersProducer()
val squares = squareTransformer(numbers)
repeat(5) { println(squares.receive()) }
coroutineContext.cancelChildren() // 停止流水线

异常与关闭细节

  • close(): 向管道发送一个“关闭令牌”。已在缓冲区的数据仍可被接收。
  • isClosedForSend / isClosedForReceive: 状态判定。
  • 失败传播: 如果 produce 协程内部抛出异常,该异常会被传播到 ReceiveChannel。当消费者尝试接收时,会感知到对应的失败。

核心开发准则

  1. 优先使用 produce:手动管理 Channel 容易导致漏掉 close() 从而造成协程泄露。produce 确保了作用域结束或异常发生时管道能正确关闭。
  2. 理解一对一特质:如果你需要多个订阅者都能看到同一份数据,请不要使用 Channel,而应选择 SharedFlow
  3. 不要在 Channel 中传递可变对象:由于数据在协程间流动,传递可变对象会引入隐蔽的竞态条件。始终坚持传递不可变数据。
  4. 控制缓冲区大小:在处理高频数据且对实时性有要求时,优先选择 CONFLATED 策略,以防止内存中堆积过时的旧任务。