【问题标题】:kotlin coroutines - use main thread in run blockingkotlin coroutines - 在运行阻塞中使用主线程
【发布时间】:2018-12-29 06:08:17
【问题描述】:

我正在尝试执行以下代码:

 val jobs = listOf(...)
 return runBlocking(CommonPool) {
    val executed = jobs.map {
        async { it.execute() }
    }.toTypedArray()
    awaitAll(*executed)
 }

其中jobs 是一些Suppliers 的列表 - 在同步世界中,这应该只是创建例如整数列表。 一切正常,但问题是没有使用主线程。 YourKit 的截图如下:

那么,问题是 - 我如何也可以利用主线程?

我想runBlocking 是这里的问题,但是还有其他方法可以得到相同的结果吗?使用 Java 并行流看起来要好得多,但主线程仍未完全利用(任务完全独立)。

更新

好吧,也许我告诉你的东西太少了。 在观看 Vankant Subramaniam 演示文稿后不久,我的问题出现了:https://youtu.be/0hQvWIdwnw4。 我需要最高性能,没有 IO,没有 Ui 等。只有计算。只有请求,我需要使用我所有可用的资源。

我的一个想法是将 paralleizm 设置为线程数 + 1,但我认为这很愚蠢。

【问题讨论】:

    标签: kotlin kotlin-coroutines


    【解决方案1】:

    我使用 Java 8 并行流测试了该解决方案:

    jobs.parallelStream().forEach { it.execute() }
    

    我发现 CPU 利用率可靠地达到 100%。作为参考,我使用了这个计算工作:

    class MyJob {
        fun execute(): Double {
            val rnd = ThreadLocalRandom.current()
            var d = 1.0
            (1..rnd.nextInt(1_000_000)).forEach { _ ->
                d *= 1 + rnd.nextDouble(0.0000001)
            }
            return d
        }
    }
    

    请注意,它的持续时间随机变化,从零到执行 100,000,000 次 FP 乘法所需的时间。

    出于好奇,我还研究了您添加到问题中的代码作为适合您的解决方案。我发现它有很多问题,例如:

    • 将所有结果累积到一个列表中,而不是在它们可用时对其进行处理
    • 提交最后一个作业后立即关闭结果通道,而不是等待所有结果

    我自己编写了一些代码,并添加了一些代码来对 Stream API 单线进行基准测试。这里是:

    const val NUM_JOBS = 1000
    val jobs = (0 until NUM_JOBS).map { MyJob() }
    
    
    fun parallelStream(): Double =
            jobs.parallelStream().map { it.execute() }.collect(summingDouble { it })
    
    fun channels(): Double {
        val resultChannel = Channel<Double>(UNLIMITED)
    
        val mainComputeChannel = Channel<MyJob>()
        val poolComputeChannels = (1..commonPool().parallelism).map { _ ->
            GlobalScope.actor<MyJob>(Dispatchers.Default) {
                for (job in channel) {
                    job.execute().also { resultChannel.send(it) }
                }
            }
        }
        val allComputeChannels = poolComputeChannels + mainComputeChannel
    
        // Launch a coroutine that submits the jobs
        GlobalScope.launch {
            jobs.forEach { job ->
                select {
                    allComputeChannels.forEach { chan ->
                        chan.onSend(job) {}
                    }
                }
            }
        }
    
        // Run the main loop which takes turns between running a job
        // submitted to the main thread channel and receiving a result
        return runBlocking {
            var completedCount = 0
            var sum = 0.0
            while (completedCount < NUM_JOBS) {
                select<Unit> {
                    mainComputeChannel.onReceive { job ->
                        job.execute().also { resultChannel.send(it) }
                    }
                    resultChannel.onReceive { result ->
                        sum += result
                        completedCount++
                    }
                }
            }
            sum
        }
    }
    
    fun main(args: Array<String>) {
        measure("Parallel Stream", ::parallelStream)
        measure("Channels", ::channels)
        measure("Parallel Stream", ::parallelStream)
        measure("Channels", ::channels)
    }
    
    fun measure(task: String, measuredCode: () -> Double) {
        val block = { print(measuredCode().toString().substringBefore('.')) }
        println("Warming up $task")
        (1..20).forEach { _ -> block() }
        println("\nMeasuring $task")
        val average = (1..20).map { measureTimeMillis(block) }.average()
        println("\n$task took $average ms")
    }
    

    这是我的典型结果:

    Parallel Stream took 396.85 ms
    Channels took 398.1 ms
    

    结果差不多,但一行代码仍然胜过 50 行代码:)

    【讨论】:

    • 现在想象一下,我有 10k 个工作,而且他们的时间不相等。我会,但我正在寻找比执行者更有效的解决方案。为什么? blog.takipi.com/…
    • 由于 Java 引入 Common Pool 的原因是为了支持 Stream API 中的自动并行化,因此没有什么能比在单行中简单地使用它更好。我更新了答案以反映这一点。
    • 是的,谢谢,但即使在问题中我也提到了这一点 :) '使用 Java 并行流,它看起来要好得多',所以你可以猜到我在寻找替代方案。
    • 我刚刚验证了利用率,单线是 100%。主线程与所有公共池线程具有完全相同的 CPU 时间。我知道您可能只是在寻找一些有趣的挑战,但就实际效率而言,您不会击败 Java 流。
    • 好的。我已经使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=8 将公共池大小增加到 8,并且 YourKit 仍然(实际上 - 再次)在主线程上主要显示橙色条,与任务管理器相同 - CPU 利用率处于 90-95% 的水平。我只是在讨论。
    【解决方案2】:

    仅仅因为没有工作在这个显式线程上运行并不意味着设备没有在同一个核心上运行其他线程。

    实际上最好让您的MainThread 空闲,这将使您的用户界面更具响应性。

    【讨论】:

    • 应用程序有 ui 吗?
    【解决方案3】:

    首先,我想感同身受的是,利用主线程通常没有任何实际用途。

    如果您的应用程序是完全异步的,那么您将只有一个(主)线程被阻塞。此线程确实会消耗一些内存并给调度程序增加一点额外压力,但对性能的额外影响可以忽略不计,甚至无法衡量。

    在实际的 Java 世界中,几乎不可能在 JVM 中拥有固定数量的线程。有系统线程(gc),有nio线程等。

    一个线程没有什么不同。只要您的应用程序中的线程数不会随着负载的增加而不受限制地增长,您就可以了。


    回到原来的问题。

    我认为在这种并行处理任务中没有一种简洁的方式来利用主线程。

    例如,您可以执行以下操作:

    data class Job(val res: Int) {
        fun execute(): Int {
            Thread.sleep(100)
            println("execute $res in ${Thread.currentThread().name}")
            return res
        }
    }
    
    fun main() {
        val jobs = (1..100).map { Job(it) }
        val resultChannel = Channel<Int>(Channel.UNLIMITED)
        val mainInputChannel = Channel<Job>()
    
        val workers = (1..10).map {
            actor<Job>(CommonPool) {
                for (j in channel) {
                    resultChannel.send(j.execute())
                }
            }
        }
    
        val res: Deferred<List<Int>> = async(CommonPool) {
            val allChannels = (listOf(mainInputChannel) + workers)
    
            jobs.forEach { job ->
                select {
                    allChannels.forEach {
                        it.onSend(job) {}
                    }
                }
            }
    
            allChannels.forEach { it.close() }
            (1..jobs.size).map { resultChannel.receive() }
        }
    
        runBlocking {
            for (j in mainInputChannel) {
                resultChannel.send(j.execute())
            }
        }
    
        runBlocking {
            res.await().forEach { println(it) }
        }
    }
    

    基本上,这是一个简单的生产者/消费者实现,其中主线程充当消费者之一。但这会导致大量样板文件。

    输出:

    execute 1 in main @coroutine#12
    execute 5 in ForkJoinPool.commonPool-worker-1 @coroutine#4
    execute 6 in ForkJoinPool.commonPool-worker-2 @coroutine#5
    execute 7 in ForkJoinPool.commonPool-worker-7 @coroutine#6
    execute 2 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 8 in ForkJoinPool.commonPool-worker-4 @coroutine#7
    execute 4 in ForkJoinPool.commonPool-worker-5 @coroutine#3
    execute 3 in ForkJoinPool.commonPool-worker-3 @coroutine#2
    execute 12 in main @coroutine#12
    execute 10 in ForkJoinPool.commonPool-worker-7 @coroutine#9
    execute 15 in ForkJoinPool.commonPool-worker-5 @coroutine#6
    execute 11 in ForkJoinPool.commonPool-worker-3 @coroutine#10
    execute 16 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 9 in ForkJoinPool.commonPool-worker-1 @coroutine#8
    execute 14 in ForkJoinPool.commonPool-worker-4 @coroutine#5
    execute 13 in ForkJoinPool.commonPool-worker-2 @coroutine#4
    execute 20 in main @coroutine#12
    execute 17 in ForkJoinPool.commonPool-worker-5 @coroutine#2
    execute 18 in ForkJoinPool.commonPool-worker-3 @coroutine#3
    execute 24 in ForkJoinPool.commonPool-worker-1 @coroutine#6
    execute 23 in ForkJoinPool.commonPool-worker-4 @coroutine#5
    execute 22 in ForkJoinPool.commonPool-worker-2 @coroutine#4
    execute 19 in ForkJoinPool.commonPool-worker-7 @coroutine#7
    execute 21 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 25 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 28 in main @coroutine#12
    execute 29 in ForkJoinPool.commonPool-worker-2 @coroutine#2
    execute 30 in ForkJoinPool.commonPool-worker-7 @coroutine#3
    execute 27 in ForkJoinPool.commonPool-worker-4 @coroutine#10
    execute 26 in ForkJoinPool.commonPool-worker-1 @coroutine#9
    execute 32 in ForkJoinPool.commonPool-worker-3 @coroutine#4
    execute 31 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 36 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 35 in ForkJoinPool.commonPool-worker-4 @coroutine#7
    execute 33 in ForkJoinPool.commonPool-worker-2 @coroutine#5
    execute 38 in ForkJoinPool.commonPool-worker-3 @coroutine#2
    execute 37 in main @coroutine#12
    execute 34 in ForkJoinPool.commonPool-worker-7 @coroutine#6
    execute 39 in ForkJoinPool.commonPool-worker-6 @coroutine#3
    execute 40 in ForkJoinPool.commonPool-worker-1 @coroutine#1
    execute 44 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 41 in ForkJoinPool.commonPool-worker-4 @coroutine#4
    execute 46 in ForkJoinPool.commonPool-worker-1 @coroutine#2
    execute 47 in ForkJoinPool.commonPool-worker-6 @coroutine#1
    execute 45 in main @coroutine#12
    execute 42 in ForkJoinPool.commonPool-worker-2 @coroutine#9
    execute 43 in ForkJoinPool.commonPool-worker-7 @coroutine#10
    execute 48 in ForkJoinPool.commonPool-worker-3 @coroutine#3
    execute 52 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 49 in ForkJoinPool.commonPool-worker-1 @coroutine#5
    execute 54 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 53 in main @coroutine#12
    execute 50 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 51 in ForkJoinPool.commonPool-worker-6 @coroutine#7
    execute 56 in ForkJoinPool.commonPool-worker-3 @coroutine#3
    execute 55 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 60 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 61 in ForkJoinPool.commonPool-worker-1 @coroutine#5
    execute 57 in ForkJoinPool.commonPool-worker-4 @coroutine#4
    execute 59 in ForkJoinPool.commonPool-worker-3 @coroutine#10
    execute 64 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 58 in ForkJoinPool.commonPool-worker-6 @coroutine#9
    execute 62 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 63 in main @coroutine#12
    execute 68 in ForkJoinPool.commonPool-worker-5 @coroutine#8
    execute 65 in ForkJoinPool.commonPool-worker-1 @coroutine#3
    execute 66 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 67 in ForkJoinPool.commonPool-worker-7 @coroutine#7
    execute 69 in ForkJoinPool.commonPool-worker-6 @coroutine#4
    execute 70 in ForkJoinPool.commonPool-worker-3 @coroutine#2
    execute 74 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 75 in main @coroutine#12
    execute 71 in ForkJoinPool.commonPool-worker-5 @coroutine#5
    execute 76 in ForkJoinPool.commonPool-worker-7 @coroutine#3
    execute 73 in ForkJoinPool.commonPool-worker-6 @coroutine#10
    execute 78 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 72 in ForkJoinPool.commonPool-worker-1 @coroutine#9
    execute 77 in ForkJoinPool.commonPool-worker-3 @coroutine#8
    execute 79 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 83 in main @coroutine#12
    execute 84 in ForkJoinPool.commonPool-worker-4 @coroutine#3
    execute 85 in ForkJoinPool.commonPool-worker-5 @coroutine#5
    execute 82 in ForkJoinPool.commonPool-worker-1 @coroutine#7
    execute 81 in ForkJoinPool.commonPool-worker-6 @coroutine#4
    execute 80 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 89 in ForkJoinPool.commonPool-worker-3 @coroutine#8
    execute 90 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 91 in main @coroutine#12
    execute 86 in ForkJoinPool.commonPool-worker-5 @coroutine#6
    execute 88 in ForkJoinPool.commonPool-worker-6 @coroutine#10
    execute 87 in ForkJoinPool.commonPool-worker-1 @coroutine#9
    execute 92 in ForkJoinPool.commonPool-worker-7 @coroutine#2
    execute 93 in ForkJoinPool.commonPool-worker-4 @coroutine#3
    execute 99 in main @coroutine#12
    execute 97 in ForkJoinPool.commonPool-worker-3 @coroutine#8
    execute 98 in ForkJoinPool.commonPool-worker-2 @coroutine#1
    execute 95 in ForkJoinPool.commonPool-worker-1 @coroutine#5
    execute 100 in ForkJoinPool.commonPool-worker-4 @coroutine#6
    execute 94 in ForkJoinPool.commonPool-worker-5 @coroutine#4
    execute 96 in ForkJoinPool.commonPool-worker-7 @coroutine#7
    1
    5
    6
    7
    2
    8
    4
    3
    12
    10
    15
    11
    16
    9
    14
    13
    20
    17
    18
    24
    23
    22
    19
    21
    25
    28
    29
    30
    27
    26
    32
    31
    36
    35
    33
    38
    37
    34
    39
    40
    44
    41
    46
    47
    45
    42
    43
    48
    52
    49
    54
    53
    50
    51
    56
    55
    60
    61
    57
    59
    64
    58
    62
    63
    68
    65
    66
    67
    69
    70
    74
    75
    71
    76
    73
    78
    72
    77
    79
    83
    84
    85
    82
    81
    80
    89
    90
    91
    86
    88
    87
    92
    93
    99
    97
    98
    95
    100
    94
    96
    

    【讨论】:

    • 曾想过使用频道,但由于提到的样板和很多复杂性,我会避免使用它。不过,我会试一试并提供反馈,谢谢。
    • @WitoldKupś,我刚刚在我的代码中发现了一个可怕的竞争条件。我为此道歉。请查看更新版本,特别是(1..jobs.size).map { resultChannel.receive() } 部分。问题是结果通道在所有工作完成之前就关闭了。此外,我真的鼓励您使用 java 的并行流与通道来检查应用程序的性能。我敢打赌,由于同步开销,您不会注意到差异,或者通道可能会变慢。通常更简单的解决方案更好。祝你好运!
    • 他们是,我终于使用了流,但总是 - 这是一个值得考虑的替代方案 IMO。
    【解决方案4】:

    async() 使用 DefaultDispatcher 不带任何参数,将从父池中获取池,因此所有异步调用都在 CommonPool 中执行。如果您想要不同的线程集来运行您的代码,请创建您自己的池。 虽然不将主线程用于计算通常是一种好习惯,但这取决于您的用例。

    【讨论】:

    • 如果有必要,那么这是一个很好的做法:) 公共池是一个分叉连接池,所以我认为主线程应该有可能在等待时参与部分工作。最后,据我所知(看附件中的更新问题),parallel stream有这种可能性。
    • 默认 ForkjoinPool 实现将在您的主线程之外创建工作人员,并且不会将您的主线程作为工作人员之一使用。所以你可以做的就是创建自己的 CoroutineDispatcher,自己实现负载均衡。您可以使用并行流或任何您想在其中消耗工作的东西,并在不同的地方重用它。我认为这个功能现在不是标准库的一部分。
    猜你喜欢
    • 2019-04-10
    • 1970-01-01
    • 2020-05-21
    • 1970-01-01
    • 2021-05-17
    • 1970-01-01
    • 1970-01-01
    • 2019-12-26
    • 1970-01-01
    相关资源
    最近更新 更多