【问题标题】:Multithreading using Kotlin Coroutines使用 Kotlin 协程的多线程
【发布时间】:2017-12-01 10:47:20
【问题描述】:

我正在试验Kotlin Coroutines 并有以下代码:

fun main(args: Array<String>) = runBlocking {
    val cores = Runtime.getRuntime().availableProcessors()
    println("number of cores: $cores")

    val jobs = List(10) {
        async(CommonPool) {
            delay(100)
            println("async #$it on thread ${Thread.currentThread().name}")
        }
    }
    jobs.forEach { it.join() }
}

这是我的输出:

number of cores: 4
async number:0 on thread ForkJoinPool.commonPool-worker-2
async number:2 on thread ForkJoinPool.commonPool-worker-3
async number:3 on thread ForkJoinPool.commonPool-worker-3
async number:4 on thread ForkJoinPool.commonPool-worker-3
async number:5 on thread ForkJoinPool.commonPool-worker-3
async number:1 on thread ForkJoinPool.commonPool-worker-1
async number:7 on thread ForkJoinPool.commonPool-worker-3
async number:6 on thread ForkJoinPool.commonPool-worker-2
async number:9 on thread ForkJoinPool.commonPool-worker-3
async number:8 on thread ForkJoinPool.commonPool-worker-1

根据 Roman Elizarov 的 answer 到另一个与协程相关的问题:

"启动只是创建新的协程,而 CommonPool 调度 ForkJoinPool.commonPool() 的协程确实使用了多个 线程,因此在本例中在多个 CPU 上执行。”

根据 Java 8 documentation

“对于需要单独或自定义池的应用程序,一个 ForkJoinPool 可以使用给定的目标并行级别构建; 默认情况下,等于可用处理器的数量。”

为什么只使用了 3 个工作线程?即使我将异步任务的数量增加到 1000+,也有相同的 3 个工作线程。

我的配置: Mac/High Sierra,具有双核 cpu(Hyper-threading,因此有 4 个可见内核)、Kotlin 1.2、kotlinx-coroutines-core:0.19.3 和 JVM 1.8

【问题讨论】:

    标签: multithreading jvm kotlin kotlinx.coroutines


    【解决方案1】:

    如果您查看CommonPool 的实现,您会注意到它正在处理java.util.concurrent.ForkJoinPool 或具有以下大小的线程池:

    (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
    

    使用4 可用的处理器,这将导致3 回答为什么您确实看到3 个工作线程。

    ForkJoinPool-size 可以如下确定(会一样):

    ForkJoinPool.commonPool().parallelism
    

    如果您使用协同程序版本 >= 1.0,请参阅 this answer

    【讨论】:

      【解决方案2】:

      从 Coroutines 1.0 开始,代码看起来会略有不同,因为 CommonPool 现在将被替换为 Dispatchers.Default

      fun main(args: Array<String>) = runBlocking {
          val cores = Runtime.getRuntime().availableProcessors()
          println("number of cores: $cores")
      
          val jobs = List(10) {
              async(Dispatchers.Default) {
                  delay(100)
                  println("async #$it on thread ${Thread.currentThread().name}")
              }
          }
          jobs.forEach { it.join() }
      }
      

      此外,您现在将获得以下信息:

      它由 JVM 上的共享线程池提供支持。默认情况下,此调度程序使用的最大线程数等于 CPU 内核数,但至少为两个。

      【讨论】:

        最近更新 更多