由于项目需要,我使用了 go 的协程,一切都相当的顺利,go 无愧于协程王者的称号。但是作为一名 Kotlin 爱好者,我又希望能在 Kotlin 中使用协程。当然了,这个时候你可能要说了,Kotlin 不也是同样以牛逼的协程出名的么,为什么会有“使用”一说?嗯,这里说的使用,是指在 Kotlin/Native 里使用,这东西和 Kotlin 的差距还是很大的。
废话不多说,直接新建一个 Kotlin/Native 的项目,然后引入协程框架:
kotlin {
def hostOs = System.getProperty("os.name")
def hostArch = System.getProperty("os.arch")
def isMingwX64 = hostOs.startsWith("Windows")
def nativeTarget
if (hostOs == "Mac OS X") {
if (hostArch == "aarch64") {
nativeTarget = macosArm64('native')
} else {
nativeTarget = macosX64('native')
}
} else if (hostOs == "Linux") nativeTarget = linuxX64("native")
else if (isMingwX64) nativeTarget = mingwX64("native")
else throw new GradleException("Host OS is not supported in Kotlin/Native.")
nativeTarget.with {
binaries {
executable {
entryPoint = 'main'
}
}
}
sourceSets {
nativeMain {
dependencies {
implementation ("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.3-native-mt") {
version {
strictly("1.6.3-native-mt")
}
}
}
}
nativeTest {
}
}
}
不得不说,Gradle 是很强,但是这一套配置写下来,也够呛,话说自从我用上 go 之后,就各种看 JVM 相关的工具链不爽了,啥时候能有个简单再简单的东西来亮亮我的眼。
然后就是简单的编码了,在进入正式工作前,我习惯于先写一个架子,如下:
runBlocking {
CoroutineScope(Dispatchers.Default).launch {
repeat(3) { index ->
launch {
repeat(3) {index2 ->
launch {
println("index = $index, index2 = $index2")
}
}
}
}
}.join()
}
运行起来结果是这样的:
index = 0, index2 = 0
index = 0, index2 = 1
index = 0, index2 = 2
index = 1, index2 = 0
index = 1, index2 = 1
index = 1, index2 = 2
index = 2, index2 = 0
index = 2, index2 = 1
index = 2, index2 = 2
这特么是协程?你跟我说这效果是协程?这显然就是一个嵌套 for 循环,严格按顺序在执行好么!
就是因为这个结果,吓得我赶紧又新建了一个 JVM 的 Kotlin 项目,将这段代码复制了进去,于是得到的结果如下:
index = 0, index2 = 2
index = 1, index2 = 0
index = 1, index2 = 2
index = 2, index2 = 2
index = 0, index2 = 1
index = 2, index2 = 0
index = 2, index2 = 1
index = 1, index2 = 1
index = 0, index2 = 0
嗯,是乱序的,JVM 下的协程没问题。那为什么 Kotlin/Native 下的协程行为就如此诡异呢?经过查询了大量资料,最终我看到这样一句话:
好家伙,这玩意就只是开了一个子线程然后就搁那顺序执行呢,所以这协程有个蛋用?
那现在只有两个办法,一个是把 Kotlin/Native 换成 Kotlin JVM,另一个是换用线程,出于对内存的尊重,我还是选择了后者(现在是真的不愿意再带入 JVM 这么个庞然大物了)。
经过反复选型,最终还是选择了采用 pthread 的方案,在这里可能又有人要问了,为啥不用 Kotlin/Native 自带的 Worker 机制呢?对于这个问题我只能说,对于刚上手的人,Worker 可能是一个优雅简洁的方案,但是,但凡仔细研究过的人都不可能再选那玩意,除非你完全不在乎性能。
好了,那下面就是写一段 pthread 了,这对我来说挺简单的,毕竟也算写了那么多年的 C:
fun main() {
val tids = mutableListOf<pthread_t?>()
for (i in 0 until threadCount) {
memScoped {
val tid = alloc<pthread_tVar>()
pthread_create(tid.ptr, null, staticCFunction(::thread_read), null)
tids.add(tid.value)
}
}
for (i in 0 until threadCount) {
pthread_join(tids[i], null)
}
}
fun thread_read(arg: COpaquePointer?): COpaquePointer? {
return null
}
是不是挺简单的?但是如果我想有传入传出参数呢?假设我需要传入一个结构体,用 C 写起来是这样:
struct myData {
int index;
off_t current;
off_t limit;
}
struct myData* data = malloc(sizeof(struct myData));
data->index = i;
data->current = offset;
data->limit = limit_size;
pthread_t tid;
pthread_create(&tid, NULL, thread_read, (void *)data);
然而在 Kotlin/Native 下要怎么写呢?这是我们遇到的又一个坑,由于 Kotlin/Native 和 C 的互操作性被 cintrop 封装,而这个封装又是各种指针类型扔来扔去,里面的代码非常难以读懂。另外,由于 Kotlin 采用另一套类型机制,它并不能声明 struct
,只能以 data class
来传递数据,因此只能找其他办法了。
通过阅读 Kotlin/Native 的文档(点此阅读),我找到了将 Kotlin 的类型包装成 C 指针的方法:
data class MyData(val index: Int, val current: Long, val limit: Long)
val tids = mutableListOf<pthread_t?>()
for (i in 0 until threadCount) {
val ref = MyData(i, offset, limitSize)
val sref = StableRef.create(ref)
val ptr = sref.asCPointer()
memScoped {
val tid = alloc<pthread_tVar>()
pthread_create(tid.ptr, null, staticCFunction(::thread_read), ptr)
tids.add(tid.value)
}
}
这样就能把参数传递过去了,然后在另一边,即 thread_read
函数内,我们可以这样做,来解开这个指针的内容:
fun thread_read(arg: COpaquePointer?): COpaquePointer? {
val sref = arg!!.asStableRef<MyData>()
val ref = sref.get()
... ...
}
诶?看起来没那么麻烦,那就把程序跑起来看看效果吧,这里会直接遇到一个大坑,而且文档里从未提及:
kotlin.native.IncorrectDereferenceException: illegal attempt to access non-shared <object>@83cb47c8 from other thread
这个异常发生在 thread_read
函数的 sref.get()
处。百思不得其解,我只是单纯的传了一个指针过来,怎么又触发了内存共享异常呢?然后就这个异常信息,我搜索了大量资料,基本上得到的信息都是“向子线程传递的参数需要加锁”,这就很奇怪啊,因为在 C 里面使用这样的代码从来没有加锁一说,只需要保证每个子线程拿到的 struct 指针是不同的就可以了,难道在 Kotlin 里面,机制有所改变?
好的,那么回到正常思路上来,首先想到的是 sref.get()
能拿到的东西就是一个 Kotlin 的类,而这句代码报错,无非就是 sref 要锁,或者里面的 Kotlin 类要锁,到底是哪个呢?直接试验就行:
val ref = MyData(i, offset, limitSize).freeze()
val sref = StableRef.create(ref).freeze()
实践证明,对 MyData 加锁是有效的。至于说那个 freeze()
方法怎么来的,参考 Kotlin/Native 的代码:
/**
* Freezes object subgraph reachable from this object. Frozen objects can be freely
* shared between threads/workers.
*
* @throws FreezingException if freezing is not possible
* @return the object itself
* @see ensureNeverFrozen
*/
public fun <T> T.freeze(): T {
freezeInternal(this)
return this
}
所以归根到底还是熟不熟的问题啊,Kotlin/Native 官方团队或许就是在逼我看源码呢(狗头)
然后再往下写吧,还是会遇到子线程向主线程返回数据的场景,有了上面的经验,代码就会好写很多了:
fun main() {
... ...
for (i in 0 until threadCount) {
memScoped {
val p = alloc<COpaquePointerVar>()
pthread_join(tids[i], p.ptr)
val sref = p.value!!.asStableRef<RetData>()
val ref = sref.get()
... ...
sref.dispose()
}
}
}
fun thread_read(arg: COpaquePointer?): COpaquePointer? {
... ...
val retRef = RetData(index).freeze()
val sretRef = StableRef.create(retRef)
return sretRef.asCPointer()
}
在这里需要非常注意,在内存管理上,Kotlin 确实是有自己的一套机制的,比如说在子线程里,不能向全局变量进行写入,我先前为了图方便,直接在子线程里修改静态变量的值,结果被报了异常。
好了,现在就可以继续添砖加瓦了,下面这个代码,也可以算作是 Kotlin 操作 pthread 的基本代码了,但是不管怎么说,Kotlin 官方提供的那个单线程协程着实让我大跌眼镜,真想问一句,你们搞毛呢?
import kotlinx.cinterop.*
import platform.posix.*
import kotlin.native.concurrent.freeze
var threadCount = 10
data class MyData(val idx: Int, val offset: Long, val limit: Long)
data class RetData(val index: Long)
fun main() {
val tids = mutableListOf<pthread_t?>()
var offset = 0L
var limitSize = 0L
for (i in 0 until threadCount) {
val ref = MyData(i, offset, limitSize).freeze()
val sref = StableRef.create(ref)
val ptr = sref.asCPointer()
memScoped {
val tid = alloc<pthread_tVar>()
pthread_create(tid.ptr, null, staticCFunction(::thread_read), ptr)
tids.add(tid.value)
}
offset += 10
limitSize += offset * 2
}
for (i in 0 until threadCount) {
memScoped {
val p = alloc<COpaquePointerVar>()
pthread_join(tids[i], p.ptr)
val sref = p.value!!.asStableRef<RetData>()
val ref = sref.get()
println("thread callback = ${ref.index}")
sref.dispose()
}
}
println("done")
}
fun thread_read(arg: COpaquePointer?): COpaquePointer? {
val sref = arg!!.asStableRef<MyData>()
val ref = sref.get()
val index = ref.idx
println("thread ${ref.idx}, offset = ${ref.offset}, limit = ${ref.limit}")
sref.dispose()
val retRef = RetData(index).freeze()
val sretRef = StableRef.create(retRef)
return sretRef.asCPointer()
}