【问题标题】:how to cap kotlin coroutines maximum concurrency如何限制 kotlin 协程的最大并发性
【发布时间】:2018-05-21 01:06:48
【问题描述】:

我有一个序列(来自 File.walkTopDown),我需要在每个序列上运行一个长时间运行的操作。我想使用 Kotlin 最佳实践/协程,但我要么没有并行性,要么并行性太多,并遇到“打开文件太多”的 IO 错误。

File("/Users/me/Pictures/").walkTopDown()
    .onFail { file, ex -> println("ERROR: $file caused $ex") }
    .filter { ... only big images... }
    .map { file ->
        async { // I *think* I want async and not "launch"...
            ImageProcessor.fromFile(file)
        }
    }

这似乎不是并行运行的,而且我的多核 CPU 永远不会超过 1 个 CPU 的价值。有没有办法使用协程来运行“NumberOfCores 并行操作”的延迟作业?

我查看了Multithreading using Kotlin Coroutines,它首先创建了所有作业,然后加入它们,但这意味着在繁重的处理加入步骤之前完成序列/文件树遍历,这似乎......不确定!将其拆分为收集和处理步骤意味着收集可以在处理之前运行。

val jobs = ... the Sequence above...
    .toSet()
println("Found ${jobs.size}")
jobs.forEach { it.await() }

【问题讨论】:

    标签: multithreading parallel-processing kotlin kotlinx.coroutines


    【解决方案1】:

    为什么不使用asFlow() 运算符,然后使用flatMapMerge

    someCoroutineScope.launch(Dispatchers.Default) {
        File("/Users/me/Pictures/").walkTopDown()
            .asFlow()
            .filter { ... only big images... }
            .flatMapMerge(concurrencyLimit) { file ->
                flow {
                    emit(runInterruptable { ImageProcessor.fromFile(file) })
                }
            }.catch { ... }
            .collect()
        }
    

    然后您可以限制同时打开的文件,同时仍同时处理它们。

    【讨论】:

    • 不错!我认为这行不通,因为我将controls the number of in-flight flows 读为“它可以合并多少个流”(在我的情况下,我只处理一个),但你现在让我认为它可能意味着“多少发出它可以立即咀嚼"
    【解决方案2】:

    这不是针对您的问题,但它确实回答了“如何限制 kotlin 协程最大并发性”的问题。

    编辑:从 kotlinx.coroutines 1.6.0 (https://github.com/Kotlin/kotlinx.coroutines/issues/2919) 开始,您可以使用 limitedParallelism,例如Dispatchers.IO.limitedParallelism(123).

    旧解决方案:起初我想使用newFixedThreadPoolContext,但1)it's deprecated 和2)它会使用线程,我认为这不是必要的或可取的(与Executors.newFixedThreadPool().asCoroutineDispatcher() 相同)。这个解决方案可能存在我使用Semaphore 不知道的缺陷,但它非常简单:

    import kotlinx.coroutines.async
    import kotlinx.coroutines.awaitAll
    import kotlinx.coroutines.coroutineScope
    import kotlinx.coroutines.sync.Semaphore
    import kotlinx.coroutines.sync.withPermit
    
    /**
     * Maps the inputs using [transform] at most [maxConcurrency] at a time until all Jobs are done.
     */
    suspend fun <TInput, TOutput> Iterable<TInput>.mapConcurrently(
        maxConcurrency: Int,
        transform: suspend (TInput) -> TOutput,
    ) = coroutineScope {
        val gate = Semaphore(maxConcurrency)
        this@mapConcurrently.map {
            async {
                gate.withPermit {
                    transform(it)
                }
            }
        }.awaitAll()
    }
    

    测试(抱歉,它使用 Spek、hamcrest 和 kotlin 测试):

    import kotlinx.coroutines.ExperimentalCoroutinesApi
    import kotlinx.coroutines.delay
    import kotlinx.coroutines.launch
    import kotlinx.coroutines.runBlocking
    import kotlinx.coroutines.test.TestCoroutineDispatcher
    import org.hamcrest.MatcherAssert.assertThat
    import org.hamcrest.Matchers.greaterThanOrEqualTo
    import org.hamcrest.Matchers.lessThanOrEqualTo
    import org.spekframework.spek2.Spek
    import org.spekframework.spek2.style.specification.describe
    import java.util.concurrent.atomic.AtomicInteger
    import kotlin.test.assertEquals
    
    @OptIn(ExperimentalCoroutinesApi::class)
    object AsyncHelpersKtTest : Spek({
        val actionDelay: Long = 1_000 // arbitrary; obvious if non-test dispatcher is used on accident
        val testDispatcher = TestCoroutineDispatcher()
    
        afterEachTest {
            // Clean up the TestCoroutineDispatcher to make sure no other work is running.
            testDispatcher.cleanupTestCoroutines()
        }
    
        describe("mapConcurrently") {
            it("should run all inputs concurrently if maxConcurrency >= size") {
                val concurrentJobCounter = AtomicInteger(0)
                val inputs = IntRange(1, 2).toList()
                val maxConcurrency = inputs.size
    
                // https://github.com/Kotlin/kotlinx.coroutines/issues/1266 has useful info & examples
                runBlocking(testDispatcher) {
                    print("start runBlocking $coroutineContext\n")
    
                    // We have to run this async so that the code afterwards can advance the virtual clock
                    val job = launch {
                        testDispatcher.pauseDispatcher {
                            val result = inputs.mapConcurrently(maxConcurrency) {
                                print("action $it $coroutineContext\n")
    
                                // Sanity check that we never run more in parallel than max
                                assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
    
                                // Allow for virtual clock adjustment
                                delay(actionDelay)
    
                                // Sanity check that we never run more in parallel than max
                                assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                                print("action $it after delay $coroutineContext\n")
    
                                it
                            }
    
                            // Order is not guaranteed, thus a Set
                            assertEquals(inputs.toSet(), result.toSet())
                            print("end mapConcurrently $coroutineContext\n")
                        }
                    }
                    print("before advanceTime $coroutineContext\n")
    
                    // Start the coroutines
                    testDispatcher.advanceTimeBy(0)
                    assertEquals(inputs.size, concurrentJobCounter.get(), "All jobs should have been started")
    
                    testDispatcher.advanceTimeBy(actionDelay)
                    print("after advanceTime $coroutineContext\n")
                    assertEquals(0, concurrentJobCounter.get(), "All jobs should have finished")
                    job.join()
                }
            }
    
            it("should run one at a time if maxConcurrency = 1") {
                val concurrentJobCounter = AtomicInteger(0)
                val inputs = IntRange(1, 2).toList()
                val maxConcurrency = 1
    
                runBlocking(testDispatcher) {
                    val job = launch {
                        testDispatcher.pauseDispatcher {
                            inputs.mapConcurrently(maxConcurrency) {
                                assertThat(concurrentJobCounter.addAndGet(1), lessThanOrEqualTo(maxConcurrency))
                                delay(actionDelay)
                                assertThat(concurrentJobCounter.getAndAdd(-1), lessThanOrEqualTo(maxConcurrency))
                                it
                            }
                        }
                    }
    
                    testDispatcher.advanceTimeBy(0)
                    assertEquals(1, concurrentJobCounter.get(), "Only one job should have started")
    
                    val elapsedTime = testDispatcher.advanceUntilIdle()
                    print("elapsedTime=$elapsedTime")
                    assertThat(
                        "Virtual time should be at least as long as if all jobs ran sequentially",
                        elapsedTime,
                        greaterThanOrEqualTo(actionDelay * inputs.size)
                    )
                    job.join()
                }
            }
    
            it("should handle cancellation") {
                val jobCounter = AtomicInteger(0)
                val inputs = IntRange(1, 2).toList()
                val maxConcurrency = 1
    
                runBlocking(testDispatcher) {
                    val job = launch {
                        testDispatcher.pauseDispatcher {
                            inputs.mapConcurrently(maxConcurrency) {
                                jobCounter.addAndGet(1)
                                delay(actionDelay)
                                it
                            }
                        }
                    }
    
                    testDispatcher.advanceTimeBy(0)
                    assertEquals(1, jobCounter.get(), "Only one job should have started")
    
                    job.cancel()
                    testDispatcher.advanceUntilIdle()
                    assertEquals(1, jobCounter.get(), "Only one job should have run")
                    job.join()
                }
            }
        }
    })
    

    根据https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/09_Testing,您可能还需要调整编译器参数以运行测试:

    compileTestKotlin {
        kotlinOptions {
            // Needed for runBlocking test coroutine dispatcher?
            freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
            freeCompilerArgs += "-Xopt-in=kotlin.RequiresOptIn"
        }
    }
    testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.4.1'
    

    【讨论】:

    • 拯救了我的一天!感谢分享,特别感谢测试。必须从那里学到很多东西
    【解决方案3】:

    这不会保留投影的顺序,但会将吞吐量限制为最多 maxDegreeOfParallelism。按照您认为合适的方式扩展和扩展。

    suspend fun <TInput, TOutput> (Collection<TInput>).inParallel(
            maxDegreeOfParallelism: Int,
            action: suspend CoroutineScope.(input: TInput) -> TOutput
    ): Iterable<TOutput> = coroutineScope {
    
        val list = this@inParallel
    
        if (list.isEmpty())
            return@coroutineScope listOf<TOutput>()
    
        val brake = Channel<Unit>(maxDegreeOfParallelism)
        val output = Channel<TOutput>()
        val counter = AtomicInteger(0)
    
        this.launch {
    
            repeat(maxDegreeOfParallelism) {
                brake.send(Unit)
            }
    
            for (input in list) {
    
                val task = this.async {
                    action(input)
                }
    
                this.launch {
                    val result = task.await()
                    output.send(result)
                    val completed = counter.incrementAndGet()
                    if (completed == list.size) {
                        output.close()
                    } else brake.send(Unit)
                }
    
                brake.receive()
            }
        }
    
        val results = mutableListOf<TOutput>()
        for (item in output) {
            results.add(item)
        }
    
        return@coroutineScope results
    }
    

    示例用法:

    val output = listOf(1, 2, 3).inParallel(2) {
        it + 1
    } // Note that output may not be in same order as list.
    

    【讨论】:

      【解决方案4】:

      这会将协程限制为工人。我建议看https://www.youtube.com/watch?v=3WGM-_MnPQA

      package com.example.workers
      
      import kotlinx.coroutines.*
      import kotlinx.coroutines.channels.ReceiveChannel
      import kotlinx.coroutines.channels.produce
      import kotlin.system.measureTimeMillis
      
      class ChannellibgradleApplication
      
      fun main(args: Array<String>) {
          var myList = mutableListOf<Int>(3000,1200,1400,3000,1200,1400,3000)
          runBlocking {
              var myChannel = produce(CoroutineName("MyInts")) {
                  myList.forEach { send(it) }
              }
      
              println("Starting coroutineScope  ")
              var time = measureTimeMillis {
                  coroutineScope {
                      var workers = 2
                      repeat(workers)
                      {
                          launch(CoroutineName("Sleep 1")) { theHardWork(myChannel) }
                      }
                  }
              }
              println("Ending coroutineScope  $time ms")
          }
      }
      
      suspend fun theHardWork(channel : ReceiveChannel<Int>) 
      {
          for(m in channel) {
              println("Starting Sleep $m")
              delay(m.toLong())
              println("Ending Sleep $m")
          }
      }
      

      【讨论】:

        【解决方案5】:

        我让它与一个频道一起工作。但也许我对你的方式有点多余?

        val pipe = ArrayChannel<Deferred<ImageFile>>(20)
        launch {
            while (!(pipe.isEmpty && pipe.isClosedForSend)) {
                imageFiles.add(pipe.receive().await())
            }
            println("pipe closed")
        }
        File("/Users/me/").walkTopDown()
                .onFail { file, ex -> println("ERROR: $file caused $ex") }
                .forEach { pipe.send(async { ImageFile.fromFile(it) }) }
        pipe.close()
        

        【讨论】:

          【解决方案6】:

          您的第一个 sn-p 的问题在于它根本不运行 - 请记住,Sequence 是惰性的,您必须使用终端操作,例如 toSet()forEach()。此外,您需要通过构造newFixedThreadPoolContext 上下文并在async 中使用它来限制可用于该任务的线程数:

          val pictureContext = newFixedThreadPoolContext(nThreads = 10, name = "reading pictures in parallel")
          
          File("/Users/me/Pictures/").walkTopDown()
              .onFail { file, ex -> println("ERROR: $file caused $ex") }
              .filter { ... only big images... }
              .map { file ->
                  async(pictureContext) {
                      ImageProcessor.fromFile(file)
                  }
              }
              .toList()
              .forEach { it.await() }
          

          编辑: 您必须使用终端操作员 (toList) befor 等待结果

          【讨论】:

          • 我虽然可以,但它似乎仍然按顺序处理最终的 forEach。例如。 .map { file -> async(CommonPool) { println("start") val img = ImageFile.fromFile(file) println("end") img } } .forEach { imageFiles.add(it.await()) if ( Math.random() > 0.999) { imageFiles.save() } }
          • 哦,snap,你是对的。现在我认为没有办法用序列来做到这一点。编辑了答案
          • 值得注意的是,使用有限的线程池会限制并行性但不会限制并发性,这意味着如果ImageProcessor.fromFile 是一个挂起函数(不会阻塞),您仍然可以处理多个文件,这可能是不是你想要的。
          猜你喜欢
          • 1970-01-01
          • 2012-01-15
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2023-03-11
          • 1970-01-01
          • 2017-11-30
          相关资源
          最近更新 更多