Skip to content

回调适配 (callbackFlow)

源:Creating a flow from a callback-based API

在 Android 及 JVM 开发中,许多底层 SDK 或旧代码依然使用监听器(Listener)或回调(Callback)来传递异步数据。将这些 API 转换为响应式 Flow 是实现全链路协程化的关键。callbackFlow 正是为此设计的专用构建器,它提供了线程安全的通信管道和生命周期收尾机制。

为什么不能使用普通的 flow 构建器?

核心限制:发射上下文

标准的 flow { ... } 构建器是顺序执行上下文保存的。这意味着:

  1. 它不能在非挂起的回调中直接调用 emit
  2. 它无法处理来自多个不同线程的并发发射。 如果你尝试在第三方 SDK 的回调线程中直接调用 emit,会触发 IllegalStateException

callbackFlow 的核心结构

callbackFlow 基于 Channel 实现,允许在任何线程中通过 trySendsend 安全地分发元素。

kotlin
fun locationFlow() = callbackFlow {
    // 1. 创建监听器对象
    val listener = object : LocationListener {
        override fun onLocationChanged(location: Location) {
            // 2. 将数据推入流。trySend 是非挂起的,适合在回调中使用
            trySend(location) 
        }
    }
    
    // 3. 注册监听
    locationManager.register(listener)
    
    // 4. ⭐️ 核心:挂起协程直到流关闭或被取消
    // 这是防止内存泄漏的生命周期锚点
    awaitClose {
        // 5. 资源清理逻辑
        locationManager.unregister(listener)
        println("清理:监听器已反注册")
    }
}

核心原语深度解析

1. awaitClose:强制性的收尾

awaitClosecallbackFlow 中最重要的函数。它会挂起当前协程,直到:

  • 消费者停止收集流且协程被取消。
  • 生产者在内部显式调用了 close()

漏写 awaitClose 的后果

如果不调用 awaitClose,协程块在执行完注册逻辑后会立即结束,导致流被直接关闭。或者更糟糕的是,如果注册逻辑阻塞,则导致回调监听器永远无法反注册,从而造成严重的内存泄漏

2. trySend vs. send

  • trySend(value):立即尝试入队。如果缓冲区满或通道关闭,返回失败结果。它不是挂起函数,非常适合在回调方法内部调用。
  • send(value):是挂起函数。当缓冲区满时会挂起,直到有消费者取走数据。通常在 callbackFlow 内部的独立协程中使用。

实战:桥接 Firebase/网络监听器

对于一次性或多值的异步监听,callbackFlow 提供了极佳的封装性。

kotlin
fun UserProfileRepository.getProfileFlow(userId: String) = callbackFlow {
    val subscription = api.subscribeToProfile(userId) { result ->
        when (result) {
            is Success -> trySend(result.data)
            is Failure -> close(result.exception) // ⭐️ 传递错误并关闭流
        }
    }
    
    awaitClose { subscription.dispose() }
}

callbackFlow vs. channelFlow

两者都支持并发发射,但在设计意图上有所侧重:

构建器核心特性典型场景
callbackFlow强制要求 awaitClose包装监听器、传感器、事件总线
channelFlow侧重于内部并发生产并在流内启动多个 launch 处理任务

核心开发准则

  1. 不要在 trySend 后手动 close:除非你确定流确实只有这一个值(一次性请求)。
  2. 正确利用 trySend 的返回值:在极端高频的场景下,可以检查 trySend(x).isFailure 来执行丢弃策略或记录日志。
  3. 保持逻辑简单callbackFlow 的核心职责是桥接。不要在其中编写复杂的业务逻辑,复杂的逻辑应交给下游的操作符(如 map, filter)去处理。
  4. 异常处理原则:如果回调中传回了错误,推荐调用 close(throwable)。这会将异常传播给下游的 catch 算子,符合 Flow 的异常透明性契约。