【问题标题】:Kotlin flow sequential asynchronous processingKotlin 流顺序异步处理
【发布时间】:2021-03-30 09:50:55
【问题描述】:

我有一个flowMutableSharedFlow,如果相关的话),我有可能很昂贵的操作,我想异步执行,同时仍然保持顺序。我使用CompletableFuture 实现了我想要的:

private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)

fun process(flow: Flow<String>) = flow
    .map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
    .buffer(threadPoolSize)
    .map { it.get() } // block current thread
    .flowOn(threadPool.asCoroutineDispatcher())

由于卸载到线程池、固定大小 buffer线程阻塞 CompletableFuture#get 的组合,此代码符合我的预期 - 最多 threadPoolSize 事件被并行处理,并且按接收顺序发送到流。

当我将CompletableFuture#get 替换为kotlinx.coroutines.future 中的扩展函数CompletableFuture#await 并使用flowasync 而不是CompletableFuture#supplyAsync 时,不再并行处理消息:

fun process(flow: Flow<String>) = flow
    .map { 
        runBlocking {
            future { expensiveHandle(it) } // same behaviour with async {...}
        }
    }
    .buffer(threadPoolSize)
    .map { it.await() }
    .flowOn(threadPool.asCoroutineDispatcher())

我可以使用协程/挂起函数做等效代码吗?

【问题讨论】:

  • 我不认为buffer 正在做你描述的事情。它为下面的代码创建了一个单独的协程,但它没有并行运行第一个 map 调用。它运行下面的.map 函数,并与流中后续项目的第一个.map 调用并行。由于第一个 .map 调用是非阻塞的,因此缓冲并没有太大的好处。
  • 啊哈,我明白了,感谢您指出 - 在这种情况下,并行性完全来自将工作卸载到线程池,对 @Tenfour04?
  • 我从来没有将 Java 并发与 Kotlin 协程混合过,所以我不想在没有测试的情况下猜测并告诉你一些错误。但我认为是的。 CompletableFuture 在您创建它们时会自动开始运行,因此它们都是并行运行的。您可以使用 async 以暂停方式获得类似的行为。
  • 除了将参数传递给 buffer 调用外,这对我来说看起来不错。只需要一个 2 的缓冲区,我认为这通常是默认值。
  • 所以在删除缓冲区并在池中使用 2 个线程而不是 5 个线程后,我没有在两个线程上运行昂贵的句柄,而只运行一个。我猜另一个被阻塞了(可能在等待中?),因为在 flowOn 中也使用了池。另外,我认为我找到了进行所需思考的方法-请参阅我的编辑。感谢您的帮助!

标签: kotlin asynchronous future kotlin-coroutines kotlin-flow


【解决方案1】:

asyncfutureCoroutineScope 的扩展函数。所以,你需要一些 CoroutineScope 给他们打电话。

runBlocking 给出了一些CoroutineScope,但它是一个阻塞调用,所以它在suspend 中的使用函数is prohibited

您可以使用GlobalScope.async,但它也是not recommended,执行将由Dispatchers.Default 调度,而不是由threadPool.asCoroutineDispatcher() 调度,如CompletableFuture 的原始示例。

coroutineScopewithContext 函数将提供CoroutineScope,它从外部作用域继承其coroutineContext,因此流处理将暂停并立即执行expensiveHandle(it) 协程。

您需要使用工厂函数创建CoroutineScope,这样协程上下文就不会混合:

fun process(flow: Flow<String>, threadPool: ThreadPoolExecutor): Flow<String> {
    val dispatcher = threadPool.asCoroutineDispatcher()
    return flow
        .map { CoroutineScope(dispatcher).async { expensiveHandle(it) } }
        .buffer(threadPool.poolSize)
        .map { it.await() }
        .flowOn(dispatcher)
}

【讨论】:

    【解决方案2】:

    不要映射作为参数传递的流,而是尝试返回一个使用 callbackFlow 构建器构建的新流并收集其中的流,这样您就可以启动多个协程来调用 expensiveHandle(it) 并尽快发送各自的结果。

    fun process(flow: Flow<String>) = callbackFlow {
            flow.collect {
                launch {
                    send(expensiveHandle(it))
                }
            }
        }.flowOn(threadPool.asCoroutineDispatcher())
    

    【讨论】:

      【解决方案3】:

      所以问题不是future 本身,而是周围的runBlocking。当使用自定义CoroutineScope 和线程池作为底层调度程序时,代码按预期工作(注意将get 更改为await,而且我使用async 而不是future,因为它在核心中协程库):

      private val threadPoolSize = 5
      private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
      private val dispatcher = threadPool.asCoroutineDispatcher()
      private val scope = CoroutineScope(dispatcher)
      
      fun process(flow: Flow<String>) = flow
          .map { scope.async(expensiveHandle(it)) }
          .buffer(threadPoolSize)
          .map { it.await() }
          .flowOn(dispatcher)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-15
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多