Kotlin/Native 下使用协程的大坑以及替代手段

Posted by rarnu on 10-07,2022

由于项目需要,我使用了 go 的协程,一切都相当的顺利,go 无愧于协程王者的称号。但是作为一名 Kotlin 爱好者,我又希望能在 Kotlin 中使用协程。当然了,这个时候你可能要说了,Kotlin 不也是同样以牛逼的协程出名的么,为什么会有“使用”一说?嗯,这里说的使用,是指在 Kotlin/Native 里使用,这东西和 Kotlin 的差距还是很大的。

废话不多说,直接新建一个 Kotlin/Native 的项目,然后引入协程框架:

kotlin {
    def hostOs = System.getProperty("os.name")
    def hostArch = System.getProperty("os.arch")
    def isMingwX64 = hostOs.startsWith("Windows")
    def nativeTarget
    if (hostOs == "Mac OS X") {
        if (hostArch == "aarch64") {
            nativeTarget = macosArm64('native')
        } else {
            nativeTarget = macosX64('native')
        }
    } else if (hostOs == "Linux") nativeTarget = linuxX64("native")
    else if (isMingwX64) nativeTarget = mingwX64("native")
    else throw new GradleException("Host OS is not supported in Kotlin/Native.")

    nativeTarget.with {
        binaries {
            executable {
                entryPoint = 'main'
            }
        }
    }
    sourceSets {
        nativeMain {
            dependencies {
                implementation ("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3-native-mt") {
                    version {
                        strictly("1.6.3-native-mt")
                    }
                }
            }

        }
        nativeTest {

        }
    }
}

不得不说,Gradle 是很强,但是这一套配置写下来,也够呛,话说自从我用上 go 之后,就各种看 JVM 相关的工具链不爽了,啥时候能有个简单再简单的东西来亮亮我的眼。

然后就是简单的编码了,在进入正式工作前,我习惯于先写一个架子,如下:

runBlocking {
    CoroutineScope(Dispatchers.Default).launch {
        repeat(3) { index ->
            launch {
                repeat(3) {index2 ->
                    launch {
                        println("index = $index, index2 = $index2")
                    }
                }
            }
        }
    }.join()
}

运行起来结果是这样的:

index = 0, index2 = 0
index = 0, index2 = 1
index = 0, index2 = 2
index = 1, index2 = 0
index = 1, index2 = 1
index = 1, index2 = 2
index = 2, index2 = 0
index = 2, index2 = 1
index = 2, index2 = 2

这特么是协程?你跟我说这效果是协程?这显然就是一个嵌套 for 循环,严格按顺序在执行好么!

就是因为这个结果,吓得我赶紧又新建了一个 JVM 的 Kotlin 项目,将这段代码复制了进去,于是得到的结果如下:

index = 0, index2 = 2
index = 1, index2 = 0
index = 1, index2 = 2
index = 2, index2 = 2
index = 0, index2 = 1
index = 2, index2 = 0
index = 2, index2 = 1
index = 1, index2 = 1
index = 0, index2 = 0

嗯,是乱序的,JVM 下的协程没问题。那为什么 Kotlin/Native 下的协程行为就如此诡异呢?经过查询了大量资料,最终我看到这样一句话:

image

好家伙,这玩意就只是开了一个子线程然后就搁那顺序执行呢,所以这协程有个蛋用?

那现在只有两个办法,一个是把 Kotlin/Native 换成 Kotlin JVM,另一个是换用线程,出于对内存的尊重,我还是选择了后者(现在是真的不愿意再带入 JVM 这么个庞然大物了)。

经过反复选型,最终还是选择了采用 pthread 的方案,在这里可能又有人要问了,为啥不用 Kotlin/Native 自带的 Worker 机制呢?对于这个问题我只能说,对于刚上手的人,Worker 可能是一个优雅简洁的方案,但是,但凡仔细研究过的人都不可能再选那玩意,除非你完全不在乎性能。

好了,那下面就是写一段 pthread 了,这对我来说挺简单的,毕竟也算写了那么多年的 C:

fun main() {
    val tids = mutableListOf<pthread_t?>()
    for (i in 0 until threadCount) {
        memScoped {
            val tid = alloc<pthread_tVar>()
            pthread_create(tid.ptr, null, staticCFunction(::thread_read), null)
            tids.add(tid.value)
        }
    }
    
    for (i in 0 until threadCount) {
        pthread_join(tids[i], null)
    }
}

fun thread_read(arg: COpaquePointer?): COpaquePointer? {
    return null
}

是不是挺简单的?但是如果我想有传入传出参数呢?假设我需要传入一个结构体,用 C 写起来是这样:

struct myData {
    int index;
    off_t current;
    off_t limit;
}

struct myData* data = malloc(sizeof(struct myData));
data->index = i;
data->current = offset;
data->limit = limit_size;

pthread_t tid;
pthread_create(&tid, NULL, thread_read, (void *)data);

然而在 Kotlin/Native 下要怎么写呢?这是我们遇到的又一个坑,由于 Kotlin/Native 和 C 的互操作性被 cintrop 封装,而这个封装又是各种指针类型扔来扔去,里面的代码非常难以读懂。另外,由于 Kotlin 采用另一套类型机制,它并不能声明 struct,只能以 data class 来传递数据,因此只能找其他办法了。

通过阅读 Kotlin/Native 的文档(点此阅读),我找到了将 Kotlin 的类型包装成 C 指针的方法:

data class MyData(val index: Int, val current: Long, val limit: Long)

val tids = mutableListOf<pthread_t?>()
for (i in 0 until threadCount) {
    val ref = MyData(i, offset, limitSize)
    val sref = StableRef.create(ref)
    val ptr = sref.asCPointer()
    memScoped {
        val tid = alloc<pthread_tVar>()
        pthread_create(tid.ptr, null, staticCFunction(::thread_read), ptr)
        tids.add(tid.value)
    }
}

这样就能把参数传递过去了,然后在另一边,即 thread_read 函数内,我们可以这样做,来解开这个指针的内容:

fun thread_read(arg: COpaquePointer?): COpaquePointer? {
    val sref = arg!!.asStableRef<MyData>()
    val ref = sref.get()
    ... ... 
}

诶?看起来没那么麻烦,那就把程序跑起来看看效果吧,这里会直接遇到一个大坑,而且文档里从未提及:

kotlin.native.IncorrectDereferenceException: illegal attempt to access non-shared <object>@83cb47c8 from other thread

这个异常发生在 thread_read 函数的 sref.get() 处。百思不得其解,我只是单纯的传了一个指针过来,怎么又触发了内存共享异常呢?然后就这个异常信息,我搜索了大量资料,基本上得到的信息都是“向子线程传递的参数需要加锁”,这就很奇怪啊,因为在 C 里面使用这样的代码从来没有加锁一说,只需要保证每个子线程拿到的 struct 指针是不同的就可以了,难道在 Kotlin 里面,机制有所改变?

好的,那么回到正常思路上来,首先想到的是 sref.get() 能拿到的东西就是一个 Kotlin 的类,而这句代码报错,无非就是 sref 要锁,或者里面的 Kotlin 类要锁,到底是哪个呢?直接试验就行:

val ref = MyData(i, offset, limitSize).freeze()
val sref = StableRef.create(ref).freeze()

实践证明,对 MyData 加锁是有效的。至于说那个 freeze() 方法怎么来的,参考 Kotlin/Native 的代码:

/**
 * Freezes object subgraph reachable from this object. Frozen objects can be freely
 * shared between threads/workers.
 *
 * @throws FreezingException if freezing is not possible
 * @return the object itself
 * @see ensureNeverFrozen
 */
public fun <T> T.freeze(): T {
    freezeInternal(this)
    return this
}

所以归根到底还是熟不熟的问题啊,Kotlin/Native 官方团队或许就是在逼我看源码呢(狗头)

然后再往下写吧,还是会遇到子线程向主线程返回数据的场景,有了上面的经验,代码就会好写很多了:

fun main() {
    ... ...
    for (i in 0 until threadCount) {
        memScoped {
            val p = alloc<COpaquePointerVar>()
            pthread_join(tids[i], p.ptr)
            val sref = p.value!!.asStableRef<RetData>()
            val ref = sref.get()
            ... ...
            sref.dispose()
        }
    }
}

fun thread_read(arg: COpaquePointer?): COpaquePointer? {
    ... ...
    val retRef = RetData(index).freeze()
    val sretRef = StableRef.create(retRef)
    return sretRef.asCPointer()
}

在这里需要非常注意,在内存管理上,Kotlin 确实是有自己的一套机制的,比如说在子线程里,不能向全局变量进行写入,我先前为了图方便,直接在子线程里修改静态变量的值,结果被报了异常。

好了,现在就可以继续添砖加瓦了,下面这个代码,也可以算作是 Kotlin 操作 pthread 的基本代码了,但是不管怎么说,Kotlin 官方提供的那个单线程协程着实让我大跌眼镜,真想问一句,你们搞毛呢?

import kotlinx.cinterop.*
import platform.posix.*
import kotlin.native.concurrent.freeze

var threadCount = 10

data class MyData(val idx: Int, val offset: Long, val limit: Long)
data class RetData(val index: Long)

fun main() {
    val tids = mutableListOf<pthread_t?>()
    
    var offset = 0L
    var limitSize = 0L
    
    for (i in 0 until threadCount) {
        val ref = MyData(i, offset, limitSize).freeze()
        val sref = StableRef.create(ref)
        val ptr = sref.asCPointer()
        memScoped {
            val tid = alloc<pthread_tVar>()
            pthread_create(tid.ptr, null, staticCFunction(::thread_read), ptr)
            tids.add(tid.value)
        }
        offset += 10
        limitSize += offset * 2
    }
    
    for (i in 0 until threadCount) {
        memScoped {
            val p = alloc<COpaquePointerVar>()
            pthread_join(tids[i], p.ptr)
            val sref = p.value!!.asStableRef<RetData>()
            val ref = sref.get()
            println("thread callback = ${ref.index}")
            sref.dispose()
        }
    }
    println("done")
}

fun thread_read(arg: COpaquePointer?): COpaquePointer? {
    val sref = arg!!.asStableRef<MyData>()
    val ref = sref.get()
    val index = ref.idx
    println("thread ${ref.idx}, offset = ${ref.offset}, limit = ${ref.limit}")
    sref.dispose()
    
    val retRef = RetData(index).freeze()
    val sretRef = StableRef.create(retRef)
    return sretRef.asCPointer()
}