Channel 管道与热流原语
源:Channels
Channel 是协程之间进行并发通信的基础原语。与 Flow 不同,Channel 提供的是一种热(Hot)通信管道,它不关注是否有订阅者,数据一旦被发送就会进入管道。它实现了经典的 CSP (Communicating Sequential Processes) 模型:通过通信来共享内存,而不是通过共享内存来通信。
Channel 与 Flow 的本质区别
理解 Channel 的关键在于识别它与 Flow 在数据分发逻辑上的差异:
| 特性 | Flow | Channel |
|---|---|---|
| 流类型 | 冷流 (Cold) | 热流 (Hot) |
| 分发模型 | 一对多 (广播),每个订阅者拿到全量数据 | 一对一 (竞争),每个元素只能被一个消费者接收一次 |
| 生命周期 | 随收集器启动和停止 | 独立存在,手动关闭 |
| 应用场景 | 状态流转、数据变换 | 任务分发、事件总线原语 |
核心操作:Send 与 Receive
Channel 接口由 SendChannel 和 ReceiveChannel 组合而成。这两个接口的挂起特性是实现非阻塞背压的关键。
kotlin
val channel = Channel<Int>()
launch {
// send 是挂起函数,缓冲区满时会挂起生产者
for (x in 1..5) channel.send(x * x)
channel.close() // 发送完毕必须关闭,否则消费者迭代器会永不结束
}
launch {
// 消费者通过迭代器或 receive 挂起接收
for (y in channel) println("接收: $y")
println("管道已彻底关闭")
}kotlin
// 推荐方式:produce 协程构建器会自动处理异常和管道关闭逻辑
val squares = produce {
for (x in 1..5) send(x * x)
}
squares.consumeEach { println(it) }管道容量与缓冲策略
创建 Channel 时可以指定缓冲区大小,这直接决定了生产者的挂起时机。
RENDEZVOUS(0):默认策略。生产者必须等到消费者准备好接收,数据才算发送成功。BUFFERED:拥有固定大小的缓冲区。缓冲区未满时发送不挂起。UNLIMITED:无限缓冲区。发送者永不挂起,直到耗尽内存。CONFLATED:缓冲区为 1。新数据总是覆盖旧数据,发送者永不挂起(类似于StateFlow的某些行为)。
并发分发模式
扇出 (Fan-out)
多个协程可以监听同一个 Channel。在这种模式下,数据是以竞争方式分发的:一个元素只会被其中一个协程处理。这对于分发耗时的后台任务非常高效。
公平性 (Fairness)
Channel 在分发数据时是公平的。它内部维护了一个队列,多个协程调用 receive 时遵循 FIFO(先进先出)原则,即最早挂起等待的协程会优先获得下一个元素。
扇入 (Fan-in)
多个协程可以同时向一个 Channel 发送数据。例如,多个传感器的采集任务可以将结果统一汇集到一个处理管道中。
管道流水线 (Pipelines)
通过将多个 produce 协程串联,可以构建强大的数据处理流水线。
kotlin
fun CoroutineScope.numbersProducer() = produce {
var x = 1
while (true) send(x++)
}
fun CoroutineScope.squareTransformer(numbers: ReceiveChannel<Int>) = produce {
for (x in numbers) send(x * x)
}
// 组合流水线
val numbers = numbersProducer()
val squares = squareTransformer(numbers)
repeat(5) { println(squares.receive()) }
coroutineContext.cancelChildren() // 停止流水线异常与关闭细节
close(): 向管道发送一个“关闭令牌”。已在缓冲区的数据仍可被接收。isClosedForSend / isClosedForReceive: 状态判定。- 失败传播: 如果
produce协程内部抛出异常,该异常会被传播到ReceiveChannel。当消费者尝试接收时,会感知到对应的失败。
核心开发准则
- 优先使用 produce:手动管理
Channel容易导致漏掉close()从而造成协程泄露。produce确保了作用域结束或异常发生时管道能正确关闭。 - 理解一对一特质:如果你需要多个订阅者都能看到同一份数据,请不要使用
Channel,而应选择SharedFlow。 - 不要在 Channel 中传递可变对象:由于数据在协程间流动,传递可变对象会引入隐蔽的竞态条件。始终坚持传递不可变数据。
- 控制缓冲区大小:在处理高频数据且对实时性有要求时,优先选择
CONFLATED策略,以防止内存中堆积过时的旧任务。