【问题标题】:Kotlin: Blocking coroutines with non-blocking I/OKotlin:使用非阻塞 I/O 阻塞协程
【发布时间】:2017-04-02 02:01:59
【问题描述】:

我正在尝试使用 Kotlin 协程来处理非阻塞 I/O。场景如下:

  1. 从线程 1 上运行的异步回调接收数据。
  2. 在线程 2 中等待此数据,然后使用它。

我当前的代码如下所示(为简洁起见进行了简化):

private var latch = CountDownLatch(1)
private var data: Any? = null

// Async callback from non-blocking I/O
fun onReceive(data: Any) {
    currentData = data
    latch.countDown()
}

// Wait and consume data
fun getData(): Any? {
    latch.await()
    latch = CountDownLatch(1)
    return currentData
}

fun processData() {
    launch(CommonPool) {
        while (true) {
            val data = getData()
            // Consume data                
        }
    }
}

据我了解,Kotlin 协程应该能够帮助我摆脱 CountDownLatch。在阅读this (awesome) guide 之后,我能想到的是这样的:

// Wait and consume data
fun getData() = async(CommonPool) {
    latch.await()
    latch = CountDownLatch(1)
    currentData
}

fun processData() {
    launch(CommonPool) {
        while (true) {
            runBlocking {
                val data = getData().await()
                // Consume data                
            }
        }
    }
}

我也试过Pipelines,结果相似。我显然不明白如何使用这些功能。

【问题讨论】:

  • 从问题中的代码很难理解您的目标是什么。请澄清什么外部函数返回什么。
  • 在这种情况下,我们需要知道哪些 API 调用返回了 Promise 以及哪些类型。请将此信息添加到问题中
  • 不要使用 CountDownLatch。请改用 ArrayBlockingQueue。让 onReceive() 调用 queue.put();并且 processData() 调用 queue.take() 而不是 getData()。
  • 正确答案取决于您如何启动稍后将调用您的onReceive 回调的异步操作,以及您是否希望回调仅被调用一次或多次。请澄清您的问题以获得正确答案。
  • 因此,如果预计会被多次调用,那么您使用CountDownLatch 的原始代码应该如何工作?例如,想想当onReceive 被第二次调用时会发生什么,而getData 已经完成了它的第一行(使用await)但还没有执行它的第二行。我只是想弄清楚您要实现的目标,这从您的原始代码中绝对不清楚。

标签: multithreading kotlin coroutine


【解决方案1】:

在使用 Android 开发时使用CountDownLatch 有一个非常常见的模式,有时您希望在处理BroadcastReceiversCountDownLatch 时使异步实现同步非常方便。

private suspend fun yourSuspendMethod() {
    val job = GlobalScope.async {    
        val latch = CountDownLatch(1)

        val watcher = object : BroadcastReceiver() {

            override fun onReceive(context: Context?, intent: Intent?) {
                if // your logic
                    latch.countDown()
            }
        }

        try {
            mContext?.registerReceiver(watcher, IntentFilter(...))

            //call a method that will trigger the broadcast receiver

            if (!latch.await(5, TimeUnit.SECONDS)) {
                throw Exception("Failed .... on latch's timeout")
            }
        } finally {
            mContext?.unregisterReceiver(watcher)
        }
    }

    job.await()
}

这里有一件非常重要的事情,不要改变 CoroutineScope 的上下文,否则它们将运行在一个完全不同的地方,按照我上面的方式,它会成为作用域/上下文的子对象。

[编辑] 我决定对这个问题多加思考以避免使用CountDownLatch。闩锁的问题是,当您调用latch.await 时,它会停止当前线程,因此如果这是来自主线程,主线程将等待并且超时,因为它没有给接收者机会叫做。解决这个问题的一种方法是使用我上面使用的示例。

我在上面的例子中忘记的一件事是,如果你想对调用者的上下文进行单元测试和同步,你需要注入上下文。如果您决定这样做,您的实现将变得更加复杂,并且您将无法使用协程的全部功能,因为您将创建额外的线程。

所以,解决方案是使用withTimeout + suspendCancellableCoroutine 的组合,你可以使用这个扩展:

suspend inline fun <T> suspendCoroutineWithTimeout(
    timeout: Long,
    crossinline block: (Continuation<T>) -> Unit
) = withTimeout(timeout) {
    suspendCancellableCoroutine(block = block)
}

你的方法应该是这样的:

private suspend fun yourSuspendMethod() {
    var watcher: BroadcastReceiver? = null

    try {
        suspendCoroutineWithTimeout<Boolean>(TimeUnit.SECONDS.toMillis(5)) {
            watcher = object : BroadcastReceiver() {

                override fun onReceive(context: Context?, intent: Intent?) {
                    if // your logic
                        it.resume(true)
                }
            }

            context?.registerReceiver(watcher, IntentFilter(...))
            //call a method that will trigger the broadcast receiver
        }
    } finally {
        context?.unregisterReceiver(watcher)
    }
}

就是这样。现在协程可以在不停止调用者线程的情况下发挥它的魔力,并且当作业被取消时,超时也会取消。

【讨论】:

    【解决方案2】:

    您没有说在onReceive() 中收到的数据是否可以并行处理。这是主要问题。如果是,您可以在onReceive() 中进行操作。如果不允许这样做,则让对onReceive() 的每个调用在CommonPool 上启动一个任务,而不使用任何协程。如果它们应该按顺序处理,那么最简单的方法是启动一个内部有循环的线程:

    fun onReceive(data: Any) {
       queue.put(data);
    }
    
     ....
    
    // loop in a thread
    while(true) {
       data = queue.take();
       processData(data);
    }
    

    同样,不需要协程。

    一般来说,协程是一种语法糖,用来表示一个异步程序,就好像它是同步的一样。我不认为您的程序可以作为使用协程的案例。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-07-25
      • 1970-01-01
      • 1970-01-01
      • 2018-04-06
      • 2020-05-24
      • 1970-01-01
      • 1970-01-01
      • 2015-02-24
      相关资源
      最近更新 更多