Skip to content

POSIX API 使用

源:Platform Libraries

Kotlin/Native 提供了 POSIX API 的完整绑定,支持底层系统编程,包括文件I/O、进程管理、网络Socket、信号处理等。

平台库导入

platform.posix 包

kotlin
@OptIn(ExperimentalForeignApi::class)
import kotlinx.cinterop.*
import platform.posix.*

// POSIX API 覆盖范围:
// - 文件系统:fopen, fread, fwrite, fclose
// - 进程:fork, exec, wait, kill
// - 网络:socket, bind, connect, send, recv
// - 时间:time, sleep, nanosleep
// - 线程:pthread_create, pthread_join
// - 信号:signal, kill, raise

所有 POSIX 函数均通过 platform.posix.* 提供。

文件 I/O

文本文件读写

kotlin
@OptIn(ExperimentalForeignApi::class)
fun textFileOperations() {
    // 写入文件
    val writeFile = fopen("data.txt", "w")
    
    if (writeFile != null) {
        fprintf(writeFile, "Line 1: Hello, POSIX!\n")
        fputs("Line 2: Kotlin Native\n", writeFile)
        
        // 格式化写入
        val number = 42
        fprintf(writeFile, "Line 3: Number = %d\n", number)
        
        fclose(writeFile)
    } else {
        println("❌ Failed to open file for writing")
    }
    
    // 读取文件
    val readFile = fopen("data.txt", "r")
    
    if (readFile != null) {
        memScoped {
            val buffer = allocArray<ByteVar>(256)
            
            while (fgets(buffer, 256, readFile) != null) {
                val line = buffer.toKString()
                print(line) // 已包含换行符
            }
        }
        
        fclose(readFile)
    } else {
        println("❌ Failed to open file for reading")
    }
}

二进制文件操作

kotlin
@OptIn(ExperimentalForeignApi::class)
data class Header(val magic: UInt, val version: UInt, val size: UInt)

fun binaryFileOperations() {
    val filename = "data.bin"
    
    // 写入二进制数据
    val writeFile = fopen(filename, "wb")
    if (writeFile != null) {
        memScoped {
            // 写入结构体
            val header = alloc<Header>().apply {
                magic = 0x12345678u
                version = 1u
                size = 100u
            }
            
            fwrite(header.ptr, sizeOf<Header>().toULong(), 1u, writeFile)
            
            // 写入数组
            val data = intArrayOf(10, 20, 30, 40, 50)
            data.usePinned { pinned ->
                fwrite(
                    pinned.addressOf(0),
                    sizeOf<IntVar>().toULong(),
                    data.size.toULong(),
                    writeFile
                )
            }
        }
        
        fclose(writeFile)
        println("✅ Binary data written")
    }
    
    // 读取二进制数据
    val readFile = fopen(filename, "rb")
    if (readFile != null) {
        memScoped {
            // 读取结构体
            val header = alloc<Header>()
            fread(header.ptr, sizeOf<Header>().toULong(), 1u, readFile)
            
            println("Header: magic=0x${header.magic.toString(16)}, version=${header.version}")
            
            // 读取数组
            val data = allocArray<IntVar>(5)
            fread(data, sizeOf<IntVar>().toULong(), 5u, readFile)
            
            print("Data: ")
            for (i in 0..4) {
                print("${data[i]} ")
            }
            println()
        }
        
        fclose(readFile)
    }
}

文件指针操作

kotlin
@OptIn(ExperimentalForeignApi::class)
fun filePointerOperations() {
    val file = fopen("test.dat", "w+b") // 读写模式
    
    if (file != null) {
        // 写入50个字节
        val data = ByteArray(50) { it.toByte() }
        data.usePinned {
            fwrite(it.addressOf(0), 1u, 50u, file)
        }
        
        // 移动到文件开头
        fseek(file, 0, SEEK_SET)
        
        // 读取前10个字节
        memScoped {
            val buffer = allocArray<ByteVar>(10)
            fread(buffer, 1u, 10u, file)
            
            print("First 10 bytes: ")
            for (i in 0..9) {
                print("${buffer[i]} ")
            }
            println()
        }
        
        // 移动到文件末尾
        fseek(file, 0, SEEK_END)
        val fileSize = ftell(file)
        println("File size: $fileSize bytes")
        
        fclose(file)
    }
}

进程管理

执行外部命令

kotlin
@OptIn(ExperimentalForeignApi::class)
fun runExternalCommand(command: String) {
    // 使用 popen 执行命令并读取输出
    val pipe = popen(command, "r")
    
    if (pipe != null) {
        memScoped {
            val buffer = allocArray<ByteVar>(4096)
            val output = StringBuilder()
            
            while (fgets(buffer, 4096, pipe) != null) {
                output.append(buffer.toKString())
            }
            
            val exitCode = pclose(pipe)
            println("Command: $command")
            println("Output:\n$output")
            println("Exit code: $exitCode")
        }
    } else {
        println("❌ Failed to execute command: $command")
    }
}

// 使用示例
fun useExternalCommands() {
    runExternalCommand("ls -l")        // Linux/macOS
    runExternalCommand("dir")          // Windows
    runExternalCommand("echo 'Hello'")
}

环境变量

kotlin
@OptIn(ExperimentalForeignApi::class)
fun environmentVariables() {
    // 读取环境变量
    val path = getenv("PATH")?.toKString()
    println("PATH: $path")
    
    val home = getenv("HOME")?.toKString()
    println("HOME: $home")
    
    // 设置环境变量
    setenv("MY_APP_VAR", "custom_value", 1) // 1 = overwrite existing
    
    val myVar = getenv("MY_APP_VAR")?.toKString()
    println("MY_APP_VAR: $myVar")
    
    // 删除环境变量
    unsetenv("MY_APP_VAR")
    
    val deleted = getenv("MY_APP_VAR")
    println("After unset: $deleted") // null
}

进程信息

kotlin
@OptIn(ExperimentalForeignApi::class)
fun processInfo() {
    // 获取进程ID
    val pid = getpid()
    println("Current PID: $pid")
    
    // 获取父进程ID
    val ppid = getppid()
    println("Parent PID: $ppid")
    
    // 获取用户ID
    val uid = getuid()
    println("User ID: $uid")
    
    // 获取当前工作目录
    memScoped {
        val buffer = allocArray<ByteVar>(1024)
        val cwd = getcwd(buffer, 1024u)
        if (cwd != null) {
            println("Current directory: ${cwd.toKString()}")
        }
    }
}

目录操作

遍历目录

kotlin
@OptIn(ExperimentalForeignApi::class)
fun listDirectory(path: String) {
    val dir = opendir(path)
    
    if (dir != null) {
        println("Contents of $path:")
        
        var entry = readdir(dir)
        while (entry != null) {
            val name = entry.pointed.d_name.toKString()
            
            // 跳过 . 和 ..
            if (name != "." && name != "..") {
                println("  $name")
            }
            
            entry = readdir(dir)
        }
        
        closedir(dir)
    } else {
        println("❌ Failed to open directory: $path")
    }
}

文件状态信息

kotlin
@OptIn(ExperimentalForeignApi::class)
fun fileInfo(filepath: String) {
    memScoped {
        val statBuf = alloc<stat>()
        
        if (stat(filepath, statBuf.ptr) == 0) {
            println("File: $filepath")
            println("  Size: ${statBuf.st_size} bytes")
            println("  Mode: 0${statBuf.st_mode.toString(8)}")
            println("  UID: ${statBuf.st_uid}")
            println("  GID: ${statBuf.st_gid}")
            
            // 判断文件类型
            val mode = statBuf.st_mode.toUInt()
            when {
                (mode and S_IFMT) == S_IFREG -> println("  Type: Regular file")
                (mode and S_IFMT) == S_IFDIR -> println("  Type: Directory")
                else -> println("  Type: Other")
            }
        } else {
            println("❌ stat() failed for: $filepath")
        }
    }
}

网络编程

TCP 客户端

kotlin
@OptIn(ExperimentalForeignApi::class)
fun tcpClient(host: String, port: Int) {
    memScoped {
        // 创建 socket
        val sockfd = socket(AF_INET, SOCK_STREAM, 0)
        if (sockfd < 0) {
            println("❌ Failed to create socket")
            return
        }
        
        // 配置服务器地址
        val serverAddr = alloc<sockaddr_in>()
        serverAddr.sin_family = AF_INET.convert()
        serverAddr.sin_port = htons(port.toUShort())
        
        // 转换 IP 地址
        inet_pton(AF_INET, host, serverAddr.sin_addr.ptr)
        
        // 连接
        val connectResult = connect(
            sockfd,
            serverAddr.ptr.reinterpret(),
            sizeOf<sockaddr_in>().convert()
        )
        
        if (connectResult < 0) {
            println("❌ Failed to connect")
            close(sockfd)
            return
        }
        
        println("✅ Connected to $host:$port")
        
        // 发送数据
        val message = "GET / HTTP/1.1\r\nHost: $host\r\n\r\n"
        message.cstr.use { cstr ->
            send(sockfd, cstr, message.length.convert(), 0)
        }
        
        // 接收响应
        val buffer = allocArray<ByteVar>(4096)
        val bytesRead = recv(sockfd, buffer, 4095u, 0)
        
        if (bytesRead > 0) {
            buffer[bytesRead.toInt()] = 0 // null terminate
            println("Response:\n${buffer.toKString()}")
        }
        
        close(sockfd)
    }
}

TCP 服务器

kotlin
@OptIn(ExperimentalForeignApi::class)
fun tcpServer(port: Int) {
    memScoped {
        // 创建 socket
        val serverfd = socket(AF_INET, SOCK_STREAM, 0)
        if (serverfd < 0) {
            println("❌ Failed to create socket")
            return
        }
        
        // 允许地址重用
        val optval = alloc<IntVar>()
        optval.value = 1
        setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, optval.ptr, sizeOf<IntVar>().convert())
        
        // 绑定地址
        val serverAddr = alloc<sockaddr_in>()
        serverAddr.sin_family = AF_INET.convert()
        serverAddr.sin_port = htons(port.toUShort())
        serverAddr.sin_addr.s_addr = INADDR_ANY
        
        if (bind(serverfd, serverAddr.ptr.reinterpret(), sizeOf<sockaddr_in>().convert()) < 0) {
            println("❌ Bind failed")
            close(serverfd)
            return
        }
        
        // 监听
        if (listen(serverfd, 5) < 0) {
            println("❌ Listen failed")
            close(serverfd)
            return
        }
        
        println("✅ Server listening on port $port")
        
        // 接受连接
        val clientAddr = alloc<sockaddr_in>()
        val addrlen = alloc<socklen_tVar>()
        addrlen.value = sizeOf<sockaddr_in>().convert()
        
        val clientfd = accept(serverfd, clientAddr.ptr.reinterpret(), addrlen.ptr)
        
        if (clientfd < 0) {
            println("❌ Accept failed")
        } else {
            println("✅ Client connected")
            
            // 接收数据
            val buffer = allocArray<ByteVar>(1024)
            val bytesRead = recv(clientfd, buffer, 1023u, 0)
            
            if (bytesRead > 0) {
                buffer[bytesRead.toInt()] = 0
                println("Received: ${buffer.toKString()}")
                
                // 发送响应
                val response = "HTTP/1.1 200 OK\r\n\r\nHello from Kotlin/Native!\r\n"
                response.cstr.use { cstr ->
                    send(clientfd, cstr, response.length.convert(), 0)
                }
            }
            
            close(clientfd)
        }
        
        close(serverfd)
    }
}

时间与睡眠

时间获取

kotlin
@OptIn(ExperimentalForeignApi::class)
fun timeOperations() {
    // 获取当前时间戳
    val timestamp = time(null)
    println("Current timestamp: $timestamp")
    
    // 格式化时间
    memScoped {
        val tm = alloc<tm>()
        localtime_r(timestamp.ptr, tm.ptr)
        
        println("Year: ${1900 + tm.tm_year}")
        println("Month: ${tm.tm_mon + 1}")
        println("Day: ${tm.tm_mday}")
        println("Hour: ${tm.tm_hour}")
        println("Minute: ${tm.tm_min}")
        println("Second: ${tm.tm_sec}")
        
        // 使用 strftime 格式化
        val buffer = allocArray<ByteVar>(80)
        strftime(buffer, 80u, "%Y-%m-%d %H:%M:%S", tm.ptr)
        println("Formatted: ${buffer.toKString()}")
    }
}

睡眠与延迟

kotlin
@OptIn(ExperimentalForeignApi::class)
fun sleepOperations() {
    println("Sleeping for 2 seconds...")
    sleep(2u)
    println("Awake!")
    
    // 高精度睡眠 (nanosleep)
    println("Sleeping for 500ms...")
    memScoped {
        val req = alloc<timespec>()
        req.tv_sec = 0
        req.tv_nsec = 500_000_000 // 500ms
        
        nanosleep(req.ptr, null)
    }
    println("Awake!")
}

信号处理

注册信号处理器

kotlin
@OptIn(ExperimentalForeignApi::class)
val signalHandler = staticCFunction<Int, Unit> { sig ->
    println("\n⚠️ Received signal: $sig")
    
    when (sig) {
        SIGINT -> println("SIGINT (Ctrl+C)")
        SIGTERM -> println("SIGTERM")
        else -> println("Unknown signal")
    }
}

fun signalHandling() {
    // 注册 SIGINT 处理器
    signal(SIGINT, signalHandler)
    
    println("Press Ctrl+C to trigger signal...")
    println("Waiting...")
    
    // 等待信号
    sleep(30u)
}

POSIX API 提供了底层系统访问能力,适用于系统级编程和性能关键场景。对于跨平台代码,优先使用 Kotlin 标准库;需要平台特定功能或最大性能时使用 POSIX。