Skip to content

Flow 单元测试

源:Testing Kotlin flows

测试异步流(Flow)比测试普通函数更具挑战性,因为流涉及值的序列、时间的推移、协程作用域的管理以及冷热流的特性差异。有效的 Flow 测试不仅要验证数据的准确性,还要验证流的开启、取消和异常反馈是否符合预期。

依赖配置与版本

查看 Coroutines Test 最新版本 | 查看 Turbine 最新版本

kotlin
dependencies {
    // 协程测试核心库:提供 runTest 和 TestDispatcher
    testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
    // 官方推荐的 Flow 断言库
    testImplementation("app.cash.turbine:turbine:1.2.1")
}
groovy
dependencies {
    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2'
    testImplementation 'app.cash.turbine:turbine:1.2.1'
}
toml
[versions]
coroutines = "1.10.2"
turbine = "1.2.1"

[libraries]
coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutines" }
turbine = { group = "app.cash.turbine", name = "turbine", version.ref = "turbine" }

测试基石:虚拟时间调度 (runTest)

所有的 Flow 测试都应在 runTest 中进行。它通过 TestDispatcher 拦截协程中的 delay 调用,并在不实际阻塞线程的情况下推动“虚拟时间”前进。

核心机制:虚拟时钟控制

深度剖析:虚拟时间是如何“跳跃”的?
  1. 调度器拦截:当你在 runTest 闭包中调用 delay(1000) 时,它并不会调用 Thread.sleep
  2. 挂起与队列:协程会挂起,并将恢复任务放入 TestDispatcher 的任务队列中,标记为 1000ms 后执行。
  3. 时间推进:如果当前没有其他可执行任务,runTest 会直接将虚拟时钟前进 1000ms,并立即恢复该协程。
  4. 无缝过渡:这使得耗时数分钟的复杂流逻辑可以在数毫秒内完成测试。

工业级方案:Turbine 声明式测试

Turbine 通过将推模式(Push)的 Flow 转换为拉模式(Pull)的挂起函数,极大地简化了断言逻辑。

核心 API 签名

kotlin
// Flow 的扩展函数
public suspend fun <T> Flow<T>.test(
    timeout: Duration? = null,
    name: String? = null,
    validate: suspend ReceiveTurbine<T>.() -> Unit
)

public interface ReceiveTurbine<T> {
    // 挂起直到流发射一个新值
    public suspend fun awaitItem(): T
    // 验证流是否正常完成
    public suspend fun awaitComplete()
    // 验证流是否以异常结束
    public suspend fun awaitError(): Throwable
    // 显式取消收集并忽略剩余事件
    public suspend fun cancelAndIgnoreRemainingEvents()
}

多场景测试实战

冷流与延迟控制

测试冷流时,重点在于验证发射序列和处理逻辑。

kotlin
@Test
fun testSimpleFlow() = runTest {
    val flow = flow {
        emit(1)
        delay(1000)
        emit(2)
        // 自动感知流的结束
    }

    flow.test {
        assertEquals(1, awaitItem())
        // 虚拟时间自动跳过 1000ms
        assertEquals(2, awaitItem())
        awaitComplete()
    }
}
kotlin
@Test
fun testErrorFlow() = runTest {
    val flow = flow {
        emit("Start")
        throw IllegalStateException("Network Failure")
    }

    flow.test {
        assertEquals("Start", awaitItem())
        val error = awaitError()
        assertTrue(error is IllegalStateException)
        assertEquals("Network Failure", error.message)
    }
}

热流测试:StateFlow 与 SharedFlow

热流的测试难点在于它们可能永不结束,且 StateFlow 具有粘性。

kotlin
@Test
fun testStateFlowUpdates() = runTest {
    val stateFlow = MutableStateFlow("Initial")
    
    stateFlow.test {
        // 1. 首先收到粘性初始值
        assertEquals("Initial", awaitItem())
        
        stateFlow.value = "Updated"
        
        // 2. 收到新值
        assertEquals("Updated", awaitItem())
        
        // 3. 热流永远不会 Complete,必须手动结束收集
        cancelAndIgnoreRemainingEvents()
    }
}

时间敏感操作符测试

测试 debouncesample 时,虚拟时间的精确控制至关重要。

kotlin
@Test
fun testDebounceOperation() = runTest {
    val flow = flow {
        emit(1)
        delay(100)
        emit(2)
        delay(500) // 超过防抖阈值
        emit(3)
    }.debounce(300L)

    flow.test {
        // 由于 1 之后 100ms 就发了 2,1 被舍弃
        assertEquals(2, awaitItem())
        assertEquals(3, awaitItem())
        awaitComplete()
    }
}

进阶:多流并行与竞态测试

在复杂的 UI 逻辑中,可能涉及多个流的并行收集,此时需要配合 launch 使用。

kotlin
@Test
fun testParallelFlows() = runTest {
    val flowA = MutableSharedFlow<Int>()
    val flowB = MutableSharedFlow<String>()

    // 同时启动两个 Turbine
    val turbineA = flowA.testIn(backgroundScope)
    val turbineB = flowB.testIn(backgroundScope)

    flowA.emit(1)
    flowB.emit("A")

    assertEquals(1, turbineA.awaitItem())
    assertEquals("A", turbineB.awaitItem())
}

工程实践准则

优先选择 Turbine

它通过事件驱动消除了大部分手动协程管理的样板代码,使测试更聚焦于数据逻辑。

显式消耗所有预期事件

测试结束前应确保所有预期的事件都已被消耗。Turbine 在 test 闭包结束时会自动检查是否有未消耗的意外事件,这有助于发现逻辑漏洞。

避免在测试中混合线程调度器

始终在测试中使用 runTest 提供的调度器环境。如果在业务逻辑中硬编码了 Dispatchers.IO,建议通过依赖注入(DI)将其替换为 TestDispatcher,否则虚拟时间控制将失效。

背景作用域的使用

对于永不结束的热流,使用 backgroundScope 启动收集可以确保在测试结束时自动清理资源,避免协程泄露。