Flow 单元测试
测试异步流(Flow)比测试普通函数更具挑战性,因为流涉及值的序列、时间的推移、协程作用域的管理以及冷热流的特性差异。有效的 Flow 测试不仅要验证数据的准确性,还要验证流的开启、取消和异常反馈是否符合预期。
依赖配置与版本
查看 Coroutines Test 最新版本 | 查看 Turbine 最新版本
kotlin
dependencies {
// 协程测试核心库:提供 runTest 和 TestDispatcher
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
// 官方推荐的 Flow 断言库
testImplementation("app.cash.turbine:turbine:1.2.1")
}groovy
dependencies {
testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2'
testImplementation 'app.cash.turbine:turbine:1.2.1'
}toml
[versions]
coroutines = "1.10.2"
turbine = "1.2.1"
[libraries]
coroutines-test = { group = "org.jetbrains.kotlinx", name = "kotlinx-coroutines-test", version.ref = "coroutines" }
turbine = { group = "app.cash.turbine", name = "turbine", version.ref = "turbine" }测试基石:虚拟时间调度 (runTest)
所有的 Flow 测试都应在 runTest 中进行。它通过 TestDispatcher 拦截协程中的 delay 调用,并在不实际阻塞线程的情况下推动“虚拟时间”前进。
核心机制:虚拟时钟控制
深度剖析:虚拟时间是如何“跳跃”的?
- 调度器拦截:当你在
runTest闭包中调用delay(1000)时,它并不会调用Thread.sleep。 - 挂起与队列:协程会挂起,并将恢复任务放入
TestDispatcher的任务队列中,标记为 1000ms 后执行。 - 时间推进:如果当前没有其他可执行任务,
runTest会直接将虚拟时钟前进 1000ms,并立即恢复该协程。 - 无缝过渡:这使得耗时数分钟的复杂流逻辑可以在数毫秒内完成测试。
工业级方案:Turbine 声明式测试
Turbine 通过将推模式(Push)的 Flow 转换为拉模式(Pull)的挂起函数,极大地简化了断言逻辑。
核心 API 签名
kotlin
// Flow 的扩展函数
public suspend fun <T> Flow<T>.test(
timeout: Duration? = null,
name: String? = null,
validate: suspend ReceiveTurbine<T>.() -> Unit
)
public interface ReceiveTurbine<T> {
// 挂起直到流发射一个新值
public suspend fun awaitItem(): T
// 验证流是否正常完成
public suspend fun awaitComplete()
// 验证流是否以异常结束
public suspend fun awaitError(): Throwable
// 显式取消收集并忽略剩余事件
public suspend fun cancelAndIgnoreRemainingEvents()
}多场景测试实战
冷流与延迟控制
测试冷流时,重点在于验证发射序列和处理逻辑。
kotlin
@Test
fun testSimpleFlow() = runTest {
val flow = flow {
emit(1)
delay(1000)
emit(2)
// 自动感知流的结束
}
flow.test {
assertEquals(1, awaitItem())
// 虚拟时间自动跳过 1000ms
assertEquals(2, awaitItem())
awaitComplete()
}
}kotlin
@Test
fun testErrorFlow() = runTest {
val flow = flow {
emit("Start")
throw IllegalStateException("Network Failure")
}
flow.test {
assertEquals("Start", awaitItem())
val error = awaitError()
assertTrue(error is IllegalStateException)
assertEquals("Network Failure", error.message)
}
}热流测试:StateFlow 与 SharedFlow
热流的测试难点在于它们可能永不结束,且 StateFlow 具有粘性。
kotlin
@Test
fun testStateFlowUpdates() = runTest {
val stateFlow = MutableStateFlow("Initial")
stateFlow.test {
// 1. 首先收到粘性初始值
assertEquals("Initial", awaitItem())
stateFlow.value = "Updated"
// 2. 收到新值
assertEquals("Updated", awaitItem())
// 3. 热流永远不会 Complete,必须手动结束收集
cancelAndIgnoreRemainingEvents()
}
}时间敏感操作符测试
测试 debounce 或 sample 时,虚拟时间的精确控制至关重要。
kotlin
@Test
fun testDebounceOperation() = runTest {
val flow = flow {
emit(1)
delay(100)
emit(2)
delay(500) // 超过防抖阈值
emit(3)
}.debounce(300L)
flow.test {
// 由于 1 之后 100ms 就发了 2,1 被舍弃
assertEquals(2, awaitItem())
assertEquals(3, awaitItem())
awaitComplete()
}
}进阶:多流并行与竞态测试
在复杂的 UI 逻辑中,可能涉及多个流的并行收集,此时需要配合 launch 使用。
kotlin
@Test
fun testParallelFlows() = runTest {
val flowA = MutableSharedFlow<Int>()
val flowB = MutableSharedFlow<String>()
// 同时启动两个 Turbine
val turbineA = flowA.testIn(backgroundScope)
val turbineB = flowB.testIn(backgroundScope)
flowA.emit(1)
flowB.emit("A")
assertEquals(1, turbineA.awaitItem())
assertEquals("A", turbineB.awaitItem())
}工程实践准则
优先选择 Turbine
它通过事件驱动消除了大部分手动协程管理的样板代码,使测试更聚焦于数据逻辑。
显式消耗所有预期事件
测试结束前应确保所有预期的事件都已被消耗。Turbine 在 test 闭包结束时会自动检查是否有未消耗的意外事件,这有助于发现逻辑漏洞。
避免在测试中混合线程调度器
始终在测试中使用 runTest 提供的调度器环境。如果在业务逻辑中硬编码了 Dispatchers.IO,建议通过依赖注入(DI)将其替换为 TestDispatcher,否则虚拟时间控制将失效。
背景作用域的使用
对于永不结束的热流,使用 backgroundScope 启动收集可以确保在测试结束时自动清理资源,避免协程泄露。