Skip to content

Worker API 并行化

源:Gradle 官方文档 - Worker API

Worker API 允许任务将工作分解为多个独立单元并行执行,充分利用多核CPU,显著提升构建性能。

Worker API 概念

为什么需要 Worker API

传统任务

kotlin
@TaskAction
fun process() {
    files.forEach { file ->
        // 串行处理每个文件
        processFile(file)
    }
}

问题

  • 串行执行
  • 无法利用多核
  • 处理大量文件慢

Worker API

kotlin
@TaskAction
fun process() {
    files.forEach { file ->
        // 并行处理
        workerExecutor.noIsolation().submit(ProcessWorkAction::class) {
            inputFile.set(file)
        }
    }
}

优势

  • 并行执行
  • 充分利用CPU
  • 提升性能

基本用法

定义 WorkAction

kotlin
abstract class ProcessWorkAction : WorkAction<ProcessWorkAction.Parameters> {
    interface Parameters : WorkParameters {
        val inputFile: RegularFileProperty
        val outputFile: RegularFileProperty
    }
    
    override fun execute() {
        val input = parameters.inputFile.get().asFile
        val output = parameters.outputFile.get().asFile
        
        // 处理文件
        val content = input.readText()
        output.writeText(content.uppercase())
    }
}

提交工作

kotlin
abstract class ParallelProcessTask : DefaultTask() {
    @get:Inject
    abstract val workerExecutor: WorkerExecutor
    
    @get:InputFiles
    abstract val inputFiles: ConfigurableFileCollection
    
    @get:OutputDirectory
    abstract val outputDir: DirectoryProperty
    
    @TaskAction
    fun process() {
        inputFiles.forEach { file ->
            workerExecutor.noIsolation().submit(ProcessWorkAction::class) {
                inputFile.set(file)
                outputFile.set(outputDir.file(file.name))
            }
        }
    }
}

工作单元隔离

noIsolation 无隔离

kotlin
workerExecutor.noIsolation().submit(MyWorkAction::class) {
    // 参数配置
}

特点

  • 共享类加载器
  • 性能最好
  • 适合大多数场景

classLoaderIsolation 类加载器隔离

kotlin
workerExecutor.classLoaderIsolation {
    classpath.from(configurations.named("myClasspath"))
}.submit(MyWorkAction::class) {
    // 参数配置
}

特点

  • 独立类加载器
  • 避免类冲突
  • 性能适中

processIsolation 进程隔离

kotlin
workerExecutor.processIsolation {
    forkOptions {
        maxHeapSize = "512m"
        systemProperty("file.encoding", "UTF-8")
    }
}.submit(MyWorkAction::class) {
    // 参数配置
}

特点

  • 独立进程
  • 完全隔离
  • 性能最慢
  • 适合不稳定的代码

参数传递

WorkParameters

kotlin
interface MyParameters : WorkParameters {
    val message: Property<String>
    val count: Property<Int>
    val inputFile: RegularFileProperty
}

abstract class MyWorkAction : WorkAction<MyParameters> {
    override fun execute() {
        println(parameters.message.get())
        println(parameters.count.get())
        println(parameters.inputFile.get().asFile.name)
    }
}

传递参数

kotlin
workerExecutor.noIsolation().submit(MyWorkAction::class) {
    message.set("Hello")
    count.set(42)
    inputFile.set(file("input.txt"))
}

等待完成

await

kotlin
@TaskAction
fun process() {
    inputFiles.forEach { file ->
        workerExecutor.noIsolation().submit(ProcessWorkAction::class) {
            inputFile.set(file)
        }
    }
    
    // 等待所有工作完成
    workerExecutor.await()
    
    println("All work completed")
}

实战案例

案例1:图片压缩

kotlin
interface ImageParameters : WorkParameters {
    val inputImage: RegularFileProperty
    val outputImage: RegularFileProperty
    val quality: Property<Int>
}

abstract class CompressImageAction : WorkAction<ImageParameters> {
    override fun execute() {
        val input = parameters.inputImage.get().asFile
        val output = parameters.outputImage.get().asFile
        val quality = parameters.quality.get()
        
        // 压缩图片
        ImageIO.read(input).let { image ->
            val writer = ImageIO.getImageWritersByFormatName("jpg").next()
            val param = writer.defaultWriteParam
            param.compressionMode = ImageWriteParam.MODE_EXPLICIT
            param.compressionQuality = quality / 100f
            
            output.outputStream().use { stream ->
                writer.output = ImageIO.createImageOutputStream(stream)
                writer.write(null, IIOImage(image, null, null), param)
            }
        }
    }
}

abstract class CompressImagesTask : DefaultTask() {
    @get:Inject
    abstract val workerExecutor: WorkerExecutor
    
    @get:InputFiles
    abstract val images: ConfigurableFileCollection
    
    @get:OutputDirectory
    abstract val outputDir: DirectoryProperty
    
    @get:Input
    abstract val quality: Property<Int>
    
    @TaskAction
    fun compress() {
        images.forEach { image ->
            workerExecutor.noIsolation().submit(CompressImageAction::class) {
                inputImage.set(image)
                outputImage.set(outputDir.file(image.name))
                quality.set(this@CompressImagesTask.quality)
            }
        }
    }
}

tasks.register<CompressImagesTask>("compressImages") {
    images.from("src/main/res/drawable-xxhdpi")
    outputDir.set(layout.buildDirectory.dir("compressed"))
    quality.set(85)
}

案例2:代码格式化

kotlin
interface FormatParameters : WorkParameters {
    val sourceFile: RegularFileProperty
}

abstract class FormatCodeAction : WorkAction<FormatParameters> {
    override fun execute() {
        val file = parameters.sourceFile.get().asFile
        val code = file.readText()
        
        // 格式化代码(示例)
        val formatted = code
            .replace("  ", "    ")  // 2空格转4空格
            .replace("\r\n", "\n")  // 统一换行符
        
        file.writeText(formatted)
    }
}

abstract class FormatCodeTask : DefaultTask() {
    @get:Inject
    abstract val workerExecutor: WorkerExecutor
    
    @get:InputFiles
    abstract val sources: ConfigurableFileCollection
    
    @TaskAction
    fun format() {
        sources.forEach { source ->
            workerExecutor.noIsolation().submit(FormatCodeAction::class) {
                sourceFile.set(source)
            }
        }
        
        workerExecutor.await()
        println("Formatted ${sources.files.size} files")
    }
}

案例3:并行测试

kotlin
interface TestParameters : WorkParameters {
    val testClass: Property<String>
    val reportFile: RegularFileProperty
}

abstract class RunTestAction : WorkAction<TestParameters> {
    override fun execute() {
        val className = parameters.testClass.get()
        val report = parameters.reportFile.get().asFile
        
        // 运行测试
        val result = runTest(className)
        
        // 写入报告
        report.writeText("Test: $className, Result: $result")
    }
    
    private fun runTest(className: String): String {
        // 实际测试逻辑
        return "PASSED"
    }
}

abstract class ParallelTestTask : DefaultTask() {
    @get:Inject
    abstract val workerExecutor: WorkerExecutor
    
    @get:Input
    abstract val testClasses: ListProperty<String>
    
    @get:OutputDirectory
    abstract val reportsDir: DirectoryProperty
    
    @TaskAction
    fun runTests() {
        testClasses.get().forEach { testClass ->
            workerExecutor.processIsolation().submit(RunTestAction::class) {
                this.testClass.set(testClass)
                reportFile.set(reportsDir.file("$testClass.txt"))
            }
        }
    }
}

性能优化

控制并发数

kotlin
@TaskAction
fun process() {
    val semaphore = Semaphore(Runtime.getRuntime().availableProcessors())
    
    files.forEach { file ->
        semaphore.acquire()
        workerExecutor.noIsolation().submit(ProcessWorkAction::class) {
            inputFile.set(file)
        }
    }
}

批处理

kotlin
@TaskAction
fun process() {
    val batches = files.chunked(10)  // 每批10个文件
    
    batches.forEach { batch ->
        workerExecutor.noIsolation().submit(BatchProcessAction::class) {
            inputFiles.set(batch)
        }
    }
}

最佳实践

选择适当的隔离级别

  • 默认使用 noIsolation
  • 类冲突时使用 classLoaderIsolation
  • 不稳定代码使用 processIsolation

合理分解工作

  • 每个工作单元独立
  • 避免共享可变状态
  • 工作量适中

使用 WorkParameters

  • 不要使用构造函数传参
  • 使用 Property API
  • 支持序列化

等待完成

kotlin
workerExecutor.await()  // 确保所有工作完成

异常处理

kotlin
override fun execute() {
    try {
        // 工作逻辑
    } catch (e: Exception) {
        logger.error("Work failed", e)
        throw e
    }
}

性能监控

kotlin
val start = System.currentTimeMillis()
workerExecutor.await()
val duration = System.currentTimeMillis() - start
println("Completed in ${duration}ms")