【发布时间】:2017-04-02 02:01:59
【问题描述】:
我正在尝试使用 Kotlin 协程来处理非阻塞 I/O。场景如下:
- 从线程 1 上运行的异步回调接收数据。
- 在线程 2 中等待此数据,然后使用它。
我当前的代码如下所示(为简洁起见进行了简化):
private var latch = CountDownLatch(1)
private var data: Any? = null
// Async callback from non-blocking I/O
fun onReceive(data: Any) {
currentData = data
latch.countDown()
}
// Wait and consume data
fun getData(): Any? {
latch.await()
latch = CountDownLatch(1)
return currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
val data = getData()
// Consume data
}
}
}
据我了解,Kotlin 协程应该能够帮助我摆脱 CountDownLatch。在阅读this (awesome) guide 之后,我能想到的是这样的:
// Wait and consume data
fun getData() = async(CommonPool) {
latch.await()
latch = CountDownLatch(1)
currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
runBlocking {
val data = getData().await()
// Consume data
}
}
}
}
我也试过Pipelines,结果相似。我显然不明白如何使用这些功能。
【问题讨论】:
-
从问题中的代码很难理解您的目标是什么。请澄清什么外部函数返回什么。
-
在这种情况下,我们需要知道哪些 API 调用返回了 Promise 以及哪些类型。请将此信息添加到问题中
-
不要使用 CountDownLatch。请改用 ArrayBlockingQueue。让 onReceive() 调用 queue.put();并且 processData() 调用 queue.take() 而不是 getData()。
-
正确答案取决于您如何启动稍后将调用您的
onReceive回调的异步操作,以及您是否希望回调仅被调用一次或多次。请澄清您的问题以获得正确答案。 -
因此,如果预计会被多次调用,那么您使用
CountDownLatch的原始代码应该如何工作?例如,想想当onReceive被第二次调用时会发生什么,而getData已经完成了它的第一行(使用await)但还没有执行它的第二行。我只是想弄清楚您要实现的目标,这从您的原始代码中绝对不清楚。
标签: multithreading kotlin coroutine