[Kotlin/Native] 多线程怎么玩?

Posted by rarnu on 05-25,2019

提到多线程,写惯了 Kotlin/JVM 的可能第一个反应就是 thread { ... },毕竟 Kotlin 已经为我们设计好了很多东西,然而在 Kotlin/Native 上,却是不存在这样的东西的。通常情况下, C 程序员会很熟悉 pthread,并且我们也可以在 Kotlin/Native 上实现类似的功能。

由于 cinterop 的存在,使得我们可以直接调用 C 的标准函数库,写出来的 pthread 代码是这样的:

memScoped {
    val thread = alloc<pthread_tVar>()
    pthread_create(thread.ptr, null, staticCFunction { argc ->
        initRuntimeIfNeeded()
        ... ...
        null // as COpaquePointer?
    }, null)
    pthread_join(thread.value, null)
}

可能从上一篇文章起,大家就对类似于 pthread_tVar 或是 IntVar 这类写法表示疑问,这些类型是怎么来的呢,其实在 cinterop 拥有一种左值约定,我贴个原文大家看一下。

Also, any C type has the Kotlin type representing the lvalue of this type, i.e., the value located in memory rather than a simple immutable self-contained value. Think C++ references, as a similar concept. For structs (and typedefs to structs) this representation is the main one and has the same name as the struct itself, for Kotlin enums it is named ${type}Var, for CPointer<T> it is CPointerVar<T>, and for most other types it is ${type}Var.

总的来说,就是 C 类型后面加 Var 就是 Kotlin 内的类型了。

一般来说,只要是 C 可以实现的,可以用非常平滑的方法迁移到 Kotlin/Native。


下面说一下 Kotlin/Native 原生实现的线程模型,相比于 pthread 的 API,原生实现的 Worker 更符合 Kotlin 的代码习惯,也拥有更好的可读性。

如以下例子:

val str = "hello"
Worker.start().execute(TransferMode.SAFE, { str }) { it }.consume { println(it) }

这段代码的意思很简单,在 execute() 方法传入参数并且启动生产,该生产过程是异步的,完成后调用 consume() 进行消费,消费的过程是同步的。

这里会有一个需要非常注意的地方,不能偷懒,比如说以下代码:

val str = "hello"
Worker.start().execute(TransferMode.SAFE, { }) { str }.consume { println(str) }

是不是看着没问题? 但是实际编译会报错,异常信息如下:

Worker.execute must take an unbound, non-capturing function or lambda

在这里需要注意的是,execute 方法的定义:

public final fun <T1, T2> execute(mode: TransferMode, producer: () -> T1, @VolatileLambda job: (T1) -> T2): Future<T2>

job 参数前有一个 @VolatileLambda 的注解,这就表明了 job 所对应的函数不允许有 绑定捕获 的行为,而直接传入 str 变量显然就是捕获了。

所以必须在 producer 参数中予以传参,知道这一点就可以做很多事情了,比如说执行一个外部的命令:

fun main(args: Array<String>) {
    if (args.isEmpty()) return
    val cmd = args[0]
    Worker.start().execute(TransferMode.SAFE, { cmd }) {
        runCommand(it)
    }.consume { 
        println("output => ${it.output}\nerror => ${it.error}") 
    }
}

其中 runCommand() 的代码如下:

import kotlinx.cinterop.*
import platform.posix.*
data class CommandResult(val output: String, val error: String)
fun runCommand(cmd: String) = memScoped {
    var ret = ""
    var err = ""
    val size = 1024
    val buf = allocArray<ByteVar>(size)
    val fst = popen(cmd, "r")
    if (fst != null) {
        while (fgets(buf, size, fst) != null) {
            ret += buf.toKString()
        }
    } else {
        err = strerror(errno)?.toKString() ?: ""
    }
    pclose(fst)
    CommandResult(ret, err)
}

知道了这些之后,就可以愉快的玩转多线程了。顺便,Kotlin 还主打协程,在这里也一起提一下。

要使用协程,必须在 build.gradle 内引用协程相关的库:

dependencies {
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core-common:1.1.1'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core-native:1.1.1'
}

这里要注意版本号,如果使用 Kotlin 1.3.21,那么协程库版本号对应为 1.1.1,如果使用 Kotlin 1.3.31,则对应为 1.2.1。如果版本号不匹配,会引起编译异常。

然后我们需要自己定义一个 CoroutineScope,这里只是做演示用,在这个 Scope 内不做其他事,可以写成这样:

private class MyScope: CoroutineScope {
    private val dispatcher = object : CoroutineDispatcher() {
        override fun dispatch(context: CoroutineContext, block: Runnable) {
            block.run()
        }
    }
    private val job = Job()
    override val coroutineContext: CoroutineContext get() = dispatcher + job
}

然后上面的 runCommand() 代码就可以改成这样:

MyScope().launch {
    val ret = runCommand(cmd)
    println("output => ${ret.output}\nerror => ${ret.error}")
}

顺便一提,原先定义的 runCommand 是一个常规方法,在协程里用也可以将其改为 suspend 方法:

suspend fun runCommand(cmd: String): CommandResult { ... }