Skip to content

Worker 线程模型已废弃 - 使用协程替代

源:Kotlin/Native Memory Manager

Worker 是传统 Kotlin/Native 的并发基础设施,提供线程级并发。虽然已被现代协程取代,但理解 Worker API 对维护遗留代码和理解并发演进至关重要。

Worker 基础概念

什么是 Worker

Worker 是 Kotlin/Native 的独立执行线程,每个 Worker:

  • 拥有独立的作业队列
  • 运行在独立的系统线程
  • 管理自己的对象堆(传统 MM)
Main Thread          Worker Thread 1      Worker Thread 2
    ↓                     ↓                    ↓
[Job Queue]          [Job Queue]          [Job Queue]
    ↓                     ↓                    ↓
[Object Heap]        [Object Heap]        [Object Heap]

创建 Worker

kotlin
@OptIn(ObsoleteWorkersApi::class)
import kotlin.native.concurrent.*

fun basicWorker() {
    // 创建并启动 Worker
    val worker = Worker.start()
    println("Worker created: ${worker.name}")
    
    // Worker 现在在后台运行,等待作业
    
    // 使用完毕后必须终止
    worker.requestTermination().result
    println("Worker terminated")
}

重要:未终止的 Worker 会导致 native 内存泄漏!

##执行作业

execute 方法

kotlin
@OptIn(ObsoleteWorkersApi::class)
fun executeBasic() {
    val worker = Worker.start()
    
    // execute 签名:
    // fun <T1, T2> execute(
    //     mode: TransferMode,
    //     producer: () -> T1,
    //     job: (T1) -> T2
    // ): Future<T2>
    
    val future = worker.execute(
        TransferMode.SAFE,        // 传输模式
        { "Input Data" }          // producer:准备输入数据
    ) { input ->                  // job:在 Worker 线程执行
        println("Worker thread: ${Thread.currentThread().name}")
        "Processed: $input"
    }
    
    // 等待结果
    val result = future.result
    println("Result: $result")
    
    worker.requestTermination().result
}

完整示例:后台计算

kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
data class ComputationTask(
    val numbers: List<Int>,
    val operation: String
)

fun backgroundComputation() {
    val worker = Worker.start()
    
    val task = ComputationTask(
        numbers = (1..1_000_000).toList(),
        operation = "sum"
    ).freeze() // Legacy MM:需要冻结
    
    val future = worker.execute(TransferMode.SAFE, { task }) { t ->
        // 在 Worker 线程执行耗时计算
        when (t.operation) {
            "sum" -> t.numbers.sum()
            "average" -> t.numbers.average()
            else -> 0
        }
    }
    
    println("Computation started...")
    
    // 主线程可以继续做其他事
    println("Doing other work on main thread")
    
    // 最终获取结果
    val result = future.result
    println("Computation result: $result")
    
    worker.requestTermination().result
}

TransferMode

SAFE vs UNSAFE

模式行为性能安全性
SAFE冻结对象并检查可达性中等✅ 高
UNSAFE跳过检查,开发者负责线程安全⚠️ 低

SAFE 模式详解

kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun safeModeDemo() {
    val worker =Worker.start()
    val data = mutableListOf(1, 2, 3)
    
    println("Before transfer: ${data.isFrozen}") // false
    
    // SAFE 模式会:
    // 1. 冻结 data 及其对象图
    // 2. 检查 data 是否仍被主线程引用
    // 3. 如果安全,传递给 Worker
    val future = worker.execute(TransferMode.SAFE, { data }) { input ->
        println("In worker: ${input.isFrozen}") // true
        input.sum()
    }
    
    val result = future.result
    println("After transfer: ${data.isFrozen}") // true - 已被冻结
    
    worker.requestTermination().result
}

UNSAFE 模式(危险)

kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun unsafeModeDemo() {
    val worker = Worker.start()
    val data = mutableListOf(10, 20, 30)
    
    // 手动确保线程安全
    data.freeze()
    
    // UNSAFE:跳过检查,更快但危险
    val future = worker.execute(TransferMode.UNSAFE, { data }) {
        it.sum()
    }
    
    // ❌ 如果 data 没有冻结或仍被主线程修改
    // 可能导致崩溃或数据损坏
    
    println("Result: ${future.result}")
    
    worker.requestTermination().result
}

Future API

状态管理

kotlin
@OptIn(ObsoleteWorkersApi::class)
fun futureStates() {
    val worker = Worker.start()
    
    val future = worker.execute(TransferMode.SAFE, { Unit }) {
        Thread.sleep(500)
        "Done"
    }
    
    // 检查状态
    when (future.state) {
        FutureState.INVALID -> println("❌ Invalid")
        FutureState.SCHEDULED -> println("⏳ Scheduled/Running")
        FutureState.COMPUTED -> println("✅ Completed")
        FutureState.CANCELLED -> println("🚫 Cancelled")
        FutureState.THROWN -> println("��� Exception thrown")
    }
    
    // 阻塞等待
    val result = future.result // 等待完成
    println(result)
    
    worker.requestTermination().result
}

超时处理(手动实现)

kotlin
@OptIn(ObsoleteWorkersApi::class)
fun futureWithTimeout(timeoutMs: Long) {
    val worker = Worker.start()
    
    val future = worker.execute(TransferMode.SAFE, { Unit }) {
        Thread.sleep(2000) // 模拟长时间任务
        "Result"
    }
    
    val startTime = System.currentTimeMillis()
    
    // 轮询直到完成或超时
    while (future.state != FutureState.COMPUTED) {
        if (System.currentTimeMillis() - startTime > timeoutMs) {
            println("❌ Timeout!")
            worker.requestTermination()
            return
        }
        Thread.sleep(50)
    }
    
    println("✅ Result: ${future.result}")
    worker.requestTermination().result
}

异常处理

kotlin
@OptIn(ObsoleteWorkersApi::class)
fun futureExceptions() {
    val worker = Worker.start()
    
    val future = worker.execute(TransferMode.SAFE, { Unit }) {
        throw IllegalStateException("Worker error!")
    }
    
    try {
        val result = future.result // 会重新抛出异常
        println(result)
    } catch (e: IllegalStateException) {
        println("Caught exception from worker: ${e.message}")
    }
    
    // 检查状态
    println("Future state: ${future.state}") // THROWN
    
    worker.requestTermination().result
}

Worker 池

复用 Worker

kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
class WorkerPool(size: Int) {
    private val workers = List(size) { Worker.start() }
    private var nextWorker = 0
    
    fun <T> submit(task: () -> T): Future<T> {
        val worker = workers[nextWorker]
        nextWorker = (nextWorker + 1) % workers.size
        
        // ⚠️ 需要确保 task 可冻结
        val frozenTask = task.freeze()
        return worker.execute(TransferMode.SAFE, { frozenTask }) { it() }
    }
    
    fun shutdown() {
        workers.forEach { it.requestTermination().result }
    }
}

fun useWorkerPool() {
    val pool = WorkerPool(4)
    
    val futures = (1..10).map { i ->
        pool.submit {
            Thread.sleep(100)
            "Task $i completed"
        }
    }
    
    futures.forEach { future ->
        println(future.result)
    }
    
    pool.shutdown()
}

迁移到协程

Worker → Coroutines 对照

kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun legacyWorkerExample() {
    val worker = Worker.start()
    
    val data = listOf(1, 2, 3, 4, 5).freeze()
    
    val future = worker.execute(TransferMode.SAFE, { data }) { input ->
        input.map { it * 2 }.sum()
    }
    
    val result = future.result
    println("Result: $result")
    
    worker.requestTermination().result
}
kotlin
import kotlinx.coroutines.*

suspend fun modernCoroutineExample() {
    val data = listOf(1, 2, 3, 4, 5)
    
    val result = withContext(Dispatchers.Default) {
        data.map { it * 2 }.sum()
    }
    
    println("Result: $result")
}

并行任务对比

kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun parallelWorkers() {
    val workers = List(4) { Worker.start() }
    
    val futures = workers.mapIndexed { i, worker ->
        worker.execute(TransferMode.SAFE, { i }) { index ->
            Thread.sleep(100)
            "Worker $index done"
        }
    }
    
    val results = futures.map { it.result }
    println(results)
    
    workers.forEach { it.requestTermination().result }
}
kotlin
import kotlinx.coroutines.*

suspend fun parallelCoroutines() = coroutineScope {
    val jobs = (0..3).map { i ->
        async(Dispatchers.Default) {
            delay(100)
            "Coroutine $i done"
        }
    }
    
    val results = jobs.awaitAll()
    println(results)
}

Worker 池 → Dispatcher

kotlin
// Legacy Worker Pool
@OptIn(ObsoleteWorkersApi::class)
val workerPool = WorkerPool(8)

// ✅ Modern: 使用 Dispatcher
val customDispatcher = Dispatchers.Default.limitedParallelism(8)

suspend fun modernPoolExample() {
    withContext(customDispatcher) {
        // 并发限制为 8
    }
}

最佳实践(遗留代码维护)

实践1:总是终止 Worker

kotlin
@OptIn(ObsoleteWorkersApi::class)
fun properCleanup() {
    val worker = Worker.start()
    
    try {
        // 使用 Worker
        val result = worker.execute(TransferMode.SAFE, { Unit }) {
            "Work done"
        }.result
        
        println(result)
    } finally {
        // ✅ 确保终止
        worker.requestTermination().result
    }
}

实践2:检查冻结状态

kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun checkBeforeTransfer() {
    val worker = Worker.start()
    val data = mutableListOf(1, 2, 3)
    
    if (!data.isFrozen) {
        data.freeze()
    }
    
    worker.execute(TransferMode.SAFE, { data }) {
        it.sum()
    }.result
    
    worker.requestTermination().result
}

实践3:避免在 Worker 中捕获外部状态

kotlin
@OptIn(ObsoleteWorkersApi::class)
// ❌ 错误:Lambda 捕获了外部变量
var externalCounter = 0

fun badWorkerUsage() {
    val worker = Worker.start()
    
    worker.execute(TransferMode.SAFE, { Unit }) {
        // ❌ 无法访问 externalCounter
        // externalCounter++  // 编译错误或运行时错误
    }
    
    worker.requestTermination().result
}

// ✅ 正确:通过参数传递
fun goodWorkerUsage() {
    val worker = Worker.start()
    val counter = 0
    
    val newCounter = worker.execute(TransferMode.SAFE, { counter }) { c ->
        c + 1 // 返回新值
    }.result
    
    println(newCounter)
    
    worker.requestTermination().result
}

Worker API 已被协程完全取代。新代码应使用 kotlinx.coroutines,它提供更简洁的API、更好的性能和更灵活的并发控制。维护遗留 Worker 代码时,请确保正确管理 Worker 生命周期和对象冻结。