【问题标题】:Parallel execution of jobs using coroutines使用协程并行执行作业
【发布时间】:2019-09-19 12:24:57
【问题描述】:

我想使用协程并行执行多个作业。这是我想出的一段代码。
我有 2 个疑问:

  • 如何确保在调用者线程中发生完成回调?

  • 代码变得更像我以前使用的回调模式 正常线程。请建议更改设计以实现 协程可读性优势。

class ParallelExecutor {

    suspend fun <OUTPUT> execute(
        jobs: List<suspend () -> OUTPUT>,
        onTimeout: (jobIndex: Int) -> OUTPUT,
        onFailure: (jobIndex: Int, exception: Throwable) -> OUTPUT,
        onCompletion: suspend (jobIndex: Int, result: OUTPUT) -> Unit,
        timeout: Long,
        onFullCompletion: suspend () -> Unit = {},
        invokeDispatcher: CoroutineDispatcher = Dispatchers.Default
    ) {
        withContext(invokeDispatcher) {
            var counter = 0
            val listenJobs = mutableListOf<Deferred<OUTPUT>>()

            jobs.forEachIndexed { index, job ->
                val listenJob = async {

                    try {
                        job()
                    } catch (e: Exception) {
                        onFailure(index, e)
                    }
                }
                listenJobs.add(listenJob)
            }

            listenJobs.forEachIndexed { index, job ->
                launch {
                    val output = try {
                        withTimeout(timeout) {
                            job.await()
                        }
                    } catch (e: TimeoutCancellationException) {
                        onTimeout(index)
                    }
                    onCompletion(index, output)
                    if (++counter == listenJobs.size) {
                        onFullCompletion()
                    }
                }
            }
        }
    }
}

【问题讨论】:

  • 这是一种非常...不同的协程使用方式,您确定要使用协程来实现吗?我不认为这需要如此通用。
  • 这个函数是如何在你的代码中使用的?这样可以更轻松地提出建议。
  • 您希望在“调用者线程”或“调用者调度程序”中调用完成回调? (线程可能是不可能的)。
  • @DominicFischer 我想同时开始多个相同工作的地方很少。这段代码在所有这些地方都是多余的。所以我把它移到了外面。这就是我达到这个目的的方式。

标签: kotlin kotlin-coroutines


【解决方案1】:

在我看来,您可以大大简化您的代码。您不需要先启动所有async 作业然后启动更多作业以等待它们的两步习惯用法。您可以只 launch 作业并委托给同一块内的回调。这样,回调自然会在调用者的调度程序上被调用,并且只有作业本身可以在更改后的上下文中使用invokeDispatcher 调用。

onFullCompletion 看起来像一段属于调用方的代码,位于execute 调用下方。由于execute 不会抛出任何异常,因此您不需要任何try-finally 来获取它。

suspend fun <OUTPUT> execute(
    jobs: List<suspend () -> OUTPUT>,
    onTimeout: (jobIndex: Int) -> OUTPUT,
    onFailure: (jobIndex: Int, exception: Throwable) -> OUTPUT,
    onCompletion: suspend (jobIndex: Int, result: OUTPUT) -> Unit,
    timeout: Long,
    invokeDispatcher: CoroutineDispatcher = Dispatchers.Default
) {
    coroutineScope {
        jobs.mapIndexed { index, job ->
            launch {
                val output = try {
                    withTimeout(timeout) {
                        withContext(invokeDispatcher) {
                            job()
                        }
                    }
                } catch (e: TimeoutCancellationException) {
                    onTimeout(index)
                } catch (e: Exception) {
                    onFailure(index, e)
                }
                onCompletion(index, output)
            }
        }
    }
}

【讨论】:

    【解决方案2】:

    进行了一些改进以回答您的疑问。

    
    class ParallelExecutor {
    
        suspend fun <OUTPUT> execute(
            jobs: List<suspend () -> OUTPUT>,
            onTimeout: (jobIndex: Int) -> OUTPUT,
            onFailure: (jobIndex: Int, exception: Throwable) -> OUTPUT,
            onCompletion: suspend (jobIndex: Int, result: OUTPUT) -> Unit,
            timeout: Long,
            invokeDispatcher: CoroutineDispatcher = Dispatchers.Default
        ) {
            supervisorScope {
                val listenJobs = jobs.map { job ->
                    async(invokeDispatcher) {
                        withTimeout(timeout) {
                            job()
                        }
                    }
                }
    
                listenJobs.forEachIndexed { index, job ->
                    launch {
                        val output = try {
                            job.await()
                        } catch (e: TimeoutCancellationException) {
                            onTimeout(index)
                        } catch (e: Exception) {
                            onFailure(index, e)
                        }
                        onCompletion(index, output)
                    }
                }
            }
        }
    }
    
    
    • 现在在超时时取消作业。
    • 现在在调用者的 dispatcher 中调用完成回调。
    • 已修复决定何时调用 onFullCompletion 时的竞争条件。
    • 删除了一些您并不真正需要的方法。

    如果您觉得这更像是回调模式,那么您根本不应该使用回调。协程的设计使您可以在使用站点以最少的样板编写此类代码,因此这样的函数不是必需的,而且看起来很奇怪(恕我直言)。

    【讨论】:

    • 谢谢。我实际上想要 onFullCompletion 回调。因此,消费者不必决定何时完成所有工作。他们可以将他们的清理逻辑作为参数传递。我担心比赛条件,并在我发布问题后试图用锁守卫柜台。
    • 一开始我把onFullCompletion放在函数的末尾,但我认为这没有意义,因为可以把它放在函数之外并保存分配。
    • 您可以将清理逻辑放在函数调用之后,协程作用域会等待其中的所有协程完成后再返回。 (除非你真的只想要回调风格的代码)。
    • 我不希望消费者从他们的代码中处理异常。我还想保留 onFailure 以便抛出消费者未处理的异常,并且这个 onFailure 将强制他们通过强制处理这种情况。他们既可以返回 OUTPUT 的默认值,也可以将其输出建模为 Either
    • 更新为使用onFailure
    猜你喜欢
    • 2013-10-21
    • 2020-03-07
    • 1970-01-01
    • 1970-01-01
    • 2019-05-24
    • 1970-01-01
    • 1970-01-01
    • 2017-06-19
    • 1970-01-01
    相关资源
    最近更新 更多