Worker 线程模型已废弃 - 使用协程替代
源:Kotlin/Native Memory Manager
Worker 是传统 Kotlin/Native 的并发基础设施,提供线程级并发。虽然已被现代协程取代,但理解 Worker API 对维护遗留代码和理解并发演进至关重要。
Worker 基础概念
什么是 Worker
Worker 是 Kotlin/Native 的独立执行线程,每个 Worker:
- 拥有独立的作业队列
- 运行在独立的系统线程上
- 管理自己的对象堆(传统 MM)
Main Thread Worker Thread 1 Worker Thread 2
↓ ↓ ↓
[Job Queue] [Job Queue] [Job Queue]
↓ ↓ ↓
[Object Heap] [Object Heap] [Object Heap]创建 Worker
kotlin
@OptIn(ObsoleteWorkersApi::class)
import kotlin.native.concurrent.*
fun basicWorker() {
// 创建并启动 Worker
val worker = Worker.start()
println("Worker created: ${worker.name}")
// Worker 现在在后台运行,等待作业
// 使用完毕后必须终止
worker.requestTermination().result
println("Worker terminated")
}重要:未终止的 Worker 会导致 native 内存泄漏!
##执行作业
execute 方法
kotlin
@OptIn(ObsoleteWorkersApi::class)
fun executeBasic() {
val worker = Worker.start()
// execute 签名:
// fun <T1, T2> execute(
// mode: TransferMode,
// producer: () -> T1,
// job: (T1) -> T2
// ): Future<T2>
val future = worker.execute(
TransferMode.SAFE, // 传输模式
{ "Input Data" } // producer:准备输入数据
) { input -> // job:在 Worker 线程执行
println("Worker thread: ${Thread.currentThread().name}")
"Processed: $input"
}
// 等待结果
val result = future.result
println("Result: $result")
worker.requestTermination().result
}完整示例:后台计算
kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
data class ComputationTask(
val numbers: List<Int>,
val operation: String
)
fun backgroundComputation() {
val worker = Worker.start()
val task = ComputationTask(
numbers = (1..1_000_000).toList(),
operation = "sum"
).freeze() // Legacy MM:需要冻结
val future = worker.execute(TransferMode.SAFE, { task }) { t ->
// 在 Worker 线程执行耗时计算
when (t.operation) {
"sum" -> t.numbers.sum()
"average" -> t.numbers.average()
else -> 0
}
}
println("Computation started...")
// 主线程可以继续做其他事
println("Doing other work on main thread")
// 最终获取结果
val result = future.result
println("Computation result: $result")
worker.requestTermination().result
}TransferMode
SAFE vs UNSAFE
| 模式 | 行为 | 性能 | 安全性 |
|---|---|---|---|
| SAFE | 冻结对象并检查可达性 | 中等 | ✅ 高 |
| UNSAFE | 跳过检查,开发者负责线程安全 | 快 | ⚠️ 低 |
SAFE 模式详解
kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun safeModeDemo() {
val worker =Worker.start()
val data = mutableListOf(1, 2, 3)
println("Before transfer: ${data.isFrozen}") // false
// SAFE 模式会:
// 1. 冻结 data 及其对象图
// 2. 检查 data 是否仍被主线程引用
// 3. 如果安全,传递给 Worker
val future = worker.execute(TransferMode.SAFE, { data }) { input ->
println("In worker: ${input.isFrozen}") // true
input.sum()
}
val result = future.result
println("After transfer: ${data.isFrozen}") // true - 已被冻结
worker.requestTermination().result
}UNSAFE 模式(危险)
kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun unsafeModeDemo() {
val worker = Worker.start()
val data = mutableListOf(10, 20, 30)
// 手动确保线程安全
data.freeze()
// UNSAFE:跳过检查,更快但危险
val future = worker.execute(TransferMode.UNSAFE, { data }) {
it.sum()
}
// ❌ 如果 data 没有冻结或仍被主线程修改
// 可能导致崩溃或数据损坏
println("Result: ${future.result}")
worker.requestTermination().result
}Future API
状态管理
kotlin
@OptIn(ObsoleteWorkersApi::class)
fun futureStates() {
val worker = Worker.start()
val future = worker.execute(TransferMode.SAFE, { Unit }) {
Thread.sleep(500)
"Done"
}
// 检查状态
when (future.state) {
FutureState.INVALID -> println("❌ Invalid")
FutureState.SCHEDULED -> println("⏳ Scheduled/Running")
FutureState.COMPUTED -> println("✅ Completed")
FutureState.CANCELLED -> println("🚫 Cancelled")
FutureState.THROWN -> println("��� Exception thrown")
}
// 阻塞等待
val result = future.result // 等待完成
println(result)
worker.requestTermination().result
}超时处理(手动实现)
kotlin
@OptIn(ObsoleteWorkersApi::class)
fun futureWithTimeout(timeoutMs: Long) {
val worker = Worker.start()
val future = worker.execute(TransferMode.SAFE, { Unit }) {
Thread.sleep(2000) // 模拟长时间任务
"Result"
}
val startTime = System.currentTimeMillis()
// 轮询直到完成或超时
while (future.state != FutureState.COMPUTED) {
if (System.currentTimeMillis() - startTime > timeoutMs) {
println("❌ Timeout!")
worker.requestTermination()
return
}
Thread.sleep(50)
}
println("✅ Result: ${future.result}")
worker.requestTermination().result
}异常处理
kotlin
@OptIn(ObsoleteWorkersApi::class)
fun futureExceptions() {
val worker = Worker.start()
val future = worker.execute(TransferMode.SAFE, { Unit }) {
throw IllegalStateException("Worker error!")
}
try {
val result = future.result // 会重新抛出异常
println(result)
} catch (e: IllegalStateException) {
println("Caught exception from worker: ${e.message}")
}
// 检查状态
println("Future state: ${future.state}") // THROWN
worker.requestTermination().result
}Worker 池
复用 Worker
kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
class WorkerPool(size: Int) {
private val workers = List(size) { Worker.start() }
private var nextWorker = 0
fun <T> submit(task: () -> T): Future<T> {
val worker = workers[nextWorker]
nextWorker = (nextWorker + 1) % workers.size
// ⚠️ 需要确保 task 可冻结
val frozenTask = task.freeze()
return worker.execute(TransferMode.SAFE, { frozenTask }) { it() }
}
fun shutdown() {
workers.forEach { it.requestTermination().result }
}
}
fun useWorkerPool() {
val pool = WorkerPool(4)
val futures = (1..10).map { i ->
pool.submit {
Thread.sleep(100)
"Task $i completed"
}
}
futures.forEach { future ->
println(future.result)
}
pool.shutdown()
}迁移到协程
Worker → Coroutines 对照
kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun legacyWorkerExample() {
val worker = Worker.start()
val data = listOf(1, 2, 3, 4, 5).freeze()
val future = worker.execute(TransferMode.SAFE, { data }) { input ->
input.map { it * 2 }.sum()
}
val result = future.result
println("Result: $result")
worker.requestTermination().result
}kotlin
import kotlinx.coroutines.*
suspend fun modernCoroutineExample() {
val data = listOf(1, 2, 3, 4, 5)
val result = withContext(Dispatchers.Default) {
data.map { it * 2 }.sum()
}
println("Result: $result")
}并行任务对比
kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun parallelWorkers() {
val workers = List(4) { Worker.start() }
val futures = workers.mapIndexed { i, worker ->
worker.execute(TransferMode.SAFE, { i }) { index ->
Thread.sleep(100)
"Worker $index done"
}
}
val results = futures.map { it.result }
println(results)
workers.forEach { it.requestTermination().result }
}kotlin
import kotlinx.coroutines.*
suspend fun parallelCoroutines() = coroutineScope {
val jobs = (0..3).map { i ->
async(Dispatchers.Default) {
delay(100)
"Coroutine $i done"
}
}
val results = jobs.awaitAll()
println(results)
}Worker 池 → Dispatcher
kotlin
// Legacy Worker Pool
@OptIn(ObsoleteWorkersApi::class)
val workerPool = WorkerPool(8)
// ✅ Modern: 使用 Dispatcher
val customDispatcher = Dispatchers.Default.limitedParallelism(8)
suspend fun modernPoolExample() {
withContext(customDispatcher) {
// 并发限制为 8
}
}最佳实践(遗留代码维护)
实践1:总是终止 Worker
kotlin
@OptIn(ObsoleteWorkersApi::class)
fun properCleanup() {
val worker = Worker.start()
try {
// 使用 Worker
val result = worker.execute(TransferMode.SAFE, { Unit }) {
"Work done"
}.result
println(result)
} finally {
// ✅ 确保终止
worker.requestTermination().result
}
}实践2:检查冻结状态
kotlin
@OptIn(ObsoleteWorkersApi::class, FreezingIsDeprecated::class)
fun checkBeforeTransfer() {
val worker = Worker.start()
val data = mutableListOf(1, 2, 3)
if (!data.isFrozen) {
data.freeze()
}
worker.execute(TransferMode.SAFE, { data }) {
it.sum()
}.result
worker.requestTermination().result
}实践3:避免在 Worker 中捕获外部状态
kotlin
@OptIn(ObsoleteWorkersApi::class)
// ❌ 错误:Lambda 捕获了外部变量
var externalCounter = 0
fun badWorkerUsage() {
val worker = Worker.start()
worker.execute(TransferMode.SAFE, { Unit }) {
// ❌ 无法访问 externalCounter
// externalCounter++ // 编译错误或运行时错误
}
worker.requestTermination().result
}
// ✅ 正确:通过参数传递
fun goodWorkerUsage() {
val worker = Worker.start()
val counter = 0
val newCounter = worker.execute(TransferMode.SAFE, { counter }) { c ->
c + 1 // 返回新值
}.result
println(newCounter)
worker.requestTermination().result
}Worker API 已被协程完全取代。新代码应使用 kotlinx.coroutines,它提供更简洁的API、更好的性能和更灵活的并发控制。维护遗留 Worker 代码时,请确保正确管理 Worker 生命周期和对象冻结。