【发布时间】:2021-03-30 09:50:55
【问题描述】:
我有一个flow(MutableSharedFlow,如果相关的话),我有可能很昂贵的操作,我想异步执行,同时仍然保持顺序。我使用CompletableFuture 实现了我想要的:
private val threadPoolSize = 5
private val threadPool = Executors.newFixedThreadPool(threadPoolSize)
fun process(flow: Flow<String>) = flow
.map { CompletableFuture.supplyAsync({ expensiveHandle(it) }, threadPool) }
.buffer(threadPoolSize)
.map { it.get() } // block current thread
.flowOn(threadPool.asCoroutineDispatcher())
由于卸载到线程池、固定大小 buffer 和 线程阻塞 CompletableFuture#get 的组合,此代码符合我的预期 - 最多 threadPoolSize 事件被并行处理,并且按接收顺序发送到流。
当我将CompletableFuture#get 替换为kotlinx.coroutines.future 中的扩展函数CompletableFuture#await 并使用flow 或async 而不是CompletableFuture#supplyAsync 时,不再并行处理消息:
fun process(flow: Flow<String>) = flow
.map {
runBlocking {
future { expensiveHandle(it) } // same behaviour with async {...}
}
}
.buffer(threadPoolSize)
.map { it.await() }
.flowOn(threadPool.asCoroutineDispatcher())
我可以使用协程/挂起函数做等效代码吗?
【问题讨论】:
-
我不认为
buffer正在做你描述的事情。它为下面的代码创建了一个单独的协程,但它没有并行运行第一个map调用。它运行下面的.map函数,并与流中后续项目的第一个.map调用并行。由于第一个.map调用是非阻塞的,因此缓冲并没有太大的好处。 -
啊哈,我明白了,感谢您指出 - 在这种情况下,并行性完全来自将工作卸载到线程池,对 @Tenfour04?
-
我从来没有将 Java 并发与 Kotlin 协程混合过,所以我不想在没有测试的情况下猜测并告诉你一些错误。但我认为是的。 CompletableFuture 在您创建它们时会自动开始运行,因此它们都是并行运行的。您可以使用
async以暂停方式获得类似的行为。 -
除了将参数传递给
buffer调用外,这对我来说看起来不错。只需要一个 2 的缓冲区,我认为这通常是默认值。 -
所以在删除缓冲区并在池中使用 2 个线程而不是 5 个线程后,我没有在两个线程上运行昂贵的句柄,而只运行一个。我猜另一个被阻塞了(可能在等待中?),因为在 flowOn 中也使用了池。另外,我认为我找到了进行所需思考的方法-请参阅我的编辑。感谢您的帮助!
标签: kotlin asynchronous future kotlin-coroutines kotlin-flow