【问题标题】:Running child coroutines in parallel with cancellation与取消并行运行子协程
【发布时间】:2021-10-17 03:36:31
【问题描述】:

假设我有以下代码:

interface Process {
    suspend fun run()
}

class ParallelProcess(initialChildren: List<Process>): Process {
    
    val children = mutableListOf(initialChildren)
    
    override suspend fun run() {
        if(!isActive()) throw IllegalStateException()
        
        // Run all the children here and wait for them
        // including the one that may have been added 
        // while this is running.
        
        markInactive()
    }
    
    fun addChild(child: Process) {
        // Add the child and start running it.
    }
    
    fun cancel() {
        // Cancel all the children and this process as well.
    }
}

所以想法是ParalleProcess 类将并行运行所有子进程 并且可以在它仍在运行时添加更多的孩子。

子进程是独立的,不会相互影响,所以如果一个失败,其他进程和父进程都不会受到影响。我认为这里SuprevisorJob 可能适合使用,但我不确定在这种情况下如何使用。

取消父进程也应该取消所有子进程。

实现上述功能的最佳方法是什么。

【问题讨论】:

  • 您可以使用并发列表,但您可能会在调用addChildcancel 之间遇到竞争条件。这可能会导致添加但从未启动或取消的作业。

标签: kotlin kotlin-coroutines coroutinescope


【解决方案1】:

你可以这样使用:

class ParallelProcess(initialChildren: List<Process>): Process {
    
    val children = initialChildren.toMutableList()
    val parentScope = CoroutineScope(SupervisorJob())
    
    override suspend fun run(){ 
        parentScope.launch {
            if(!isActive) throw IllegalStateException()

            // Run all the children here and wait for them
            // including the one that may have been added 
            // while this is running.
            children.map { launch { it.run() } }.joinAll()
            //  markInactive()
        }
    }
    
    fun addChild(child: Process) {
        // Add the child and start running it.
        children += child
        // either wait for all children to finish or call cancel()
        // Or you can make sure to run only processes that are not started
        parentScope.launch {
            run()
        }
    }
    
    fun cancel() {
        // Cancel all the children and this process as well.
        parentScope.cancel()
    }
}

虽然添加一个孩子会产生另一个并发问题,但您应该根据您想要的行为来解决这个问题。我尝试添加一些 cmets 来讨论一些可能性

【讨论】:

  • 感谢您的回复。我希望run() 在所有孩子都完成之前不会返回。在您的版本中,run() 将立即返回。
  • 另外addChild()在添加和启动子节点后不能等待并立即返回。 run() 必须等待这个新添加的孩子。
【解决方案2】:
class DefaultProcess(private val delay: Long) : Process {
    override suspend fun run() {
        delay(delay)
        log("DefaultProcess($delay) done.")
    }
}

private fun CoroutineScope.runProcesses(parallelProcess: ParallelProcess) = launch {
    parallelProcess.run()
}

private fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")

class ParallelProcess(private val initialChildren: List<Process>) : Process {
    private var jobs = mutableListOf<Job>()
    private val children = MutableSharedFlow<Process>(
        extraBufferCapacity = 1,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )

    override suspend fun run() {
        val time = measureTimeMillis {
            coroutineScope {
                initialChildren.forEach { child ->
                    jobs.add(launch { child.run() })
                }

                val flowJob = children
                    .onEach { child ->
                        log("added: DefaultProcess")
                        jobs.add(launch { child.run() })
                    }
                    .launchIn(this)

                while (jobs.isActive) delay(10) // Keep checking if jobs are done
                flowJob.cancel() // When done cancel the flowJob so that "run" can finally return
            }
        }
        log("Total running time: $time ms.")
    }

    fun addChild(child: Process): Boolean {
        return if (jobs.isActive) children.tryEmit(child) else false
    }

    fun cancel() {
        jobs.forEach { job -> job.cancel() }
    }

    private val List<Job>.isActive: Boolean
        get() = any { job -> job.isActive }
}

一个例子(使用 runBlocking 和 job.join() 阻塞主线程直到它完成,以便可以看到输出,不要在生产中使用 runBlocking 阻塞主线程!!! ):

val processes = listOf(
        DefaultProcess(100),
        DefaultProcess(50),
        DefaultProcess(300),
        DefaultProcess(500)
    )

runBlocking {
    val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
    val parallelProcess = ParallelProcess(processes)
    val job = scope.runProcesses(parallelProcess)

    /*// Comment out to check that adding new jobs works
    delay(300)

    // It will be added, but immediately dropped, because we're adding a new value down below, before this
    // one is processed, and we have a strategy onBufferOverflow = BufferOverflow.DROP_OLDEST
    var added = parallelProcess.addChild(DefaultProcess(300))
    log("added1: $added")

    // This one will be added
    added = parallelProcess.addChild(DefaultProcess(400))
    log("added2: $added")

    delay(600) // run already done, child won't be added
    added = parallelProcess.addChild(DefaultProcess(200))
    log("added3: $added")*/

    /*// Comment out to check that cancel also works
    // Delayed for 350, so the first three jobs are done, but if want to start adding without delaying, you need
    to delay for at least 1ms, to give some time to jobs to turn its state into active, otherwise isActive will
    return false and child will not be added.
    delay(350)
    val added = parallelProcess.addChild(DefaultProcess(4000))
    log("added: $added")
    delay(2000)
    parallelProcess.cancel() // Compare times with and without cancelling*/
    job.join()
}

您将无法同时添加子项(只会添加最后一个子项),如果您不想丢失任何值,请将 addChild 挂起,而不是 tryEmit 使用 emit 并制作 @ 987654326@ 未提供任何参数(默认值)。这仍将按顺序添加子项,但如果正在添加新子项,它将在最后一个添加到作业之前暂停。对于并发,您必须使用通道,其中addChild 将产生值,而在run 内部,您将有多个(取决于您想要拥有的并发消费者数量)协程使用这些值。但同样,由于所有这些消费者都会使 jobs 静音,因此您必须使用一些东西来同步它。

【讨论】:

    猜你喜欢
    • 2020-06-16
    • 1970-01-01
    • 2019-12-18
    • 2020-09-16
    • 1970-01-01
    • 2021-12-25
    • 2020-01-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多