class DefaultProcess(private val delay: Long) : Process {
override suspend fun run() {
delay(delay)
log("DefaultProcess($delay) done.")
}
}
private fun CoroutineScope.runProcesses(parallelProcess: ParallelProcess) = launch {
parallelProcess.run()
}
private fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
class ParallelProcess(private val initialChildren: List<Process>) : Process {
private var jobs = mutableListOf<Job>()
private val children = MutableSharedFlow<Process>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
override suspend fun run() {
val time = measureTimeMillis {
coroutineScope {
initialChildren.forEach { child ->
jobs.add(launch { child.run() })
}
val flowJob = children
.onEach { child ->
log("added: DefaultProcess")
jobs.add(launch { child.run() })
}
.launchIn(this)
while (jobs.isActive) delay(10) // Keep checking if jobs are done
flowJob.cancel() // When done cancel the flowJob so that "run" can finally return
}
}
log("Total running time: $time ms.")
}
fun addChild(child: Process): Boolean {
return if (jobs.isActive) children.tryEmit(child) else false
}
fun cancel() {
jobs.forEach { job -> job.cancel() }
}
private val List<Job>.isActive: Boolean
get() = any { job -> job.isActive }
}
一个例子(使用 runBlocking 和 job.join() 阻塞主线程直到它完成,以便可以看到输出,不要在生产中使用 runBlocking 阻塞主线程!!! ):
val processes = listOf(
DefaultProcess(100),
DefaultProcess(50),
DefaultProcess(300),
DefaultProcess(500)
)
runBlocking {
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
val parallelProcess = ParallelProcess(processes)
val job = scope.runProcesses(parallelProcess)
/*// Comment out to check that adding new jobs works
delay(300)
// It will be added, but immediately dropped, because we're adding a new value down below, before this
// one is processed, and we have a strategy onBufferOverflow = BufferOverflow.DROP_OLDEST
var added = parallelProcess.addChild(DefaultProcess(300))
log("added1: $added")
// This one will be added
added = parallelProcess.addChild(DefaultProcess(400))
log("added2: $added")
delay(600) // run already done, child won't be added
added = parallelProcess.addChild(DefaultProcess(200))
log("added3: $added")*/
/*// Comment out to check that cancel also works
// Delayed for 350, so the first three jobs are done, but if want to start adding without delaying, you need
to delay for at least 1ms, to give some time to jobs to turn its state into active, otherwise isActive will
return false and child will not be added.
delay(350)
val added = parallelProcess.addChild(DefaultProcess(4000))
log("added: $added")
delay(2000)
parallelProcess.cancel() // Compare times with and without cancelling*/
job.join()
}
您将无法同时添加子项(只会添加最后一个子项),如果您不想丢失任何值,请将 addChild 挂起,而不是 tryEmit 使用 emit 并制作 @ 987654326@ 未提供任何参数(默认值)。这仍将按顺序添加子项,但如果正在添加新子项,它将在最后一个添加到作业之前暂停。对于并发,您必须使用通道,其中addChild 将产生值,而在run 内部,您将有多个(取决于您想要拥有的并发消费者数量)协程使用这些值。但同样,由于所有这些消费者都会使 jobs 静音,因此您必须使用一些东西来同步它。