回调适配 (callbackFlow)
源:Creating a flow from a callback-based API
在 Android 及 JVM 开发中,许多底层 SDK 或旧代码依然使用监听器(Listener)或回调(Callback)来传递异步数据。将这些 API 转换为响应式 Flow 是实现全链路协程化的关键。callbackFlow 正是为此设计的专用构建器,它提供了线程安全的通信管道和生命周期收尾机制。
为什么不能使用普通的 flow 构建器?
核心限制:发射上下文
标准的 flow { ... } 构建器是顺序执行且上下文保存的。这意味着:
- 它不能在非挂起的回调中直接调用
emit。 - 它无法处理来自多个不同线程的并发发射。 如果你尝试在第三方 SDK 的回调线程中直接调用
emit,会触发IllegalStateException。
callbackFlow 的核心结构
callbackFlow 基于 Channel 实现,允许在任何线程中通过 trySend 或 send 安全地分发元素。
kotlin
fun locationFlow() = callbackFlow {
// 1. 创建监听器对象
val listener = object : LocationListener {
override fun onLocationChanged(location: Location) {
// 2. 将数据推入流。trySend 是非挂起的,适合在回调中使用
trySend(location)
}
}
// 3. 注册监听
locationManager.register(listener)
// 4. ⭐️ 核心:挂起协程直到流关闭或被取消
// 这是防止内存泄漏的生命周期锚点
awaitClose {
// 5. 资源清理逻辑
locationManager.unregister(listener)
println("清理:监听器已反注册")
}
}核心原语深度解析
1. awaitClose:强制性的收尾
awaitClose 是 callbackFlow 中最重要的函数。它会挂起当前协程,直到:
- 消费者停止收集流且协程被取消。
- 生产者在内部显式调用了
close()。
漏写 awaitClose 的后果
如果不调用 awaitClose,协程块在执行完注册逻辑后会立即结束,导致流被直接关闭。或者更糟糕的是,如果注册逻辑阻塞,则导致回调监听器永远无法反注册,从而造成严重的内存泄漏。
2. trySend vs. send
trySend(value):立即尝试入队。如果缓冲区满或通道关闭,返回失败结果。它不是挂起函数,非常适合在回调方法内部调用。send(value):是挂起函数。当缓冲区满时会挂起,直到有消费者取走数据。通常在callbackFlow内部的独立协程中使用。
实战:桥接 Firebase/网络监听器
对于一次性或多值的异步监听,callbackFlow 提供了极佳的封装性。
kotlin
fun UserProfileRepository.getProfileFlow(userId: String) = callbackFlow {
val subscription = api.subscribeToProfile(userId) { result ->
when (result) {
is Success -> trySend(result.data)
is Failure -> close(result.exception) // ⭐️ 传递错误并关闭流
}
}
awaitClose { subscription.dispose() }
}callbackFlow vs. channelFlow
两者都支持并发发射,但在设计意图上有所侧重:
| 构建器 | 核心特性 | 典型场景 |
|---|---|---|
callbackFlow | 强制要求 awaitClose | 包装监听器、传感器、事件总线 |
channelFlow | 侧重于内部并发生产 | 并在流内启动多个 launch 处理任务 |
核心开发准则
- 不要在 trySend 后手动 close:除非你确定流确实只有这一个值(一次性请求)。
- 正确利用 trySend 的返回值:在极端高频的场景下,可以检查
trySend(x).isFailure来执行丢弃策略或记录日志。 - 保持逻辑简单:
callbackFlow的核心职责是桥接。不要在其中编写复杂的业务逻辑,复杂的逻辑应交给下游的操作符(如map,filter)去处理。 - 异常处理原则:如果回调中传回了错误,推荐调用
close(throwable)。这会将异常传播给下游的catch算子,符合 Flow 的异常透明性契约。