【问题标题】:Async/await kotlin coroutines from blocking code来自阻塞代码的异步/等待 kotlin 协程
【发布时间】:2020-12-13 03:16:44
【问题描述】:

我正在使用没有响应式 Web 的 Spring Boot。

我尝试使用 Kotlin 协程运行一些异步请求

    @GetMapping
    fun test(): Message {
        val restTemplate = RestTemplate()
        return runBlocking {
            val hello = async { hello(restTemplate) }
            val world = async { world(restTemplate) }
            Message("${hello.await()} ${world.await()}!")
        }
    }

    private suspend fun world(restTemplate: RestTemplate): String {
        logger.info("Getting WORLD")
        return restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
    }

    private suspend fun hello(restTemplate: RestTemplate): String {
        logger.info("Getting HELLO")
        return restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
    }

但是这段代码是按顺序运行的。

我该如何解决?

【问题讨论】:

  • restTemplate.getForEntity 是挂起函数吗?
  • 没有。这不是暂停乐趣
  • 仅仅将一个函数标记为挂起并不能使它成为可挂起或异步的,另一个副作用是runBlocking是单线程的,所以单线程将首先启动然后被阻塞,然后只有第二个请求将发生。您必须使用 withContext(Dispatchers.IO) { /* Blocking Call */ } 包装阻塞调用。

标签: spring spring-boot kotlin async-await kotlin-coroutines


【解决方案1】:

TL;DRasync 与用于卸载阻塞IO 的自定义调度程序(例如Dispatchers.IO)一起使用。

val hello = async(Dispatchers.IO) { hello(restTemplate) }
val world = async(Dispatchers.IO) { world(restTemplate) }

更新: 我在Kotlin coroutines slack channel 中被告知,我可以使用async(Dispatchers.IO) 而不是使用async + withContext(Dispatchers.IO)

我使用了@Sergey Nikulitsa 代码并创建了一个扩展函数,它接受一个带有接收器的 lambda(类似于 async)来组合 asyncwithContext(Dispatches.IO)

import kotlinx.coroutines.*

fun <T> CoroutineScope.myRestTemplateAsync(
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {

    return async(Dispatchers.IO, start) {
        block() 
    }
}

然后它可以像这样在你的代码中使用:


@GetMapping
fun test(): Message {
    val restTemplate = RestTemplate()
    return runBlocking {
        val hello = myRestTemplateAsync { hello(restTemplate) }
        val world = myRestTemplateAsync { world(restTemplate) }
        Message("${hello.await()} ${world.await()}!")
    }
}

private suspend fun world(restTemplate: RestTemplate): String {
    logger.info("Getting WORLD")
    return restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
}

private suspend fun hello(restTemplate: RestTemplate): String {
    logger.info("Getting HELLO")
    return restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
} 

初步结果

此时,我只是在试验这种方法,我只使用 Spring WebMVC 和 RestTemplate 进行 5 次以上的调用。

myRestTemplateAsync 扩展函数与同步对应函数相比,始终将执行时间缩短了 30% 到 50%

为什么只使用 async { } 就可以解决问题?

特别是对于 RestTemplate,在 coroutineScope 中使用 async {...} 似乎没有什么不同,并且执行时间与同步代码相当。

此外,查看分析器中的线程,在单独使用 async 时没有创建“Dispatcher Workers”。这让我相信 RestTemplate 的每请求线程模型阻塞了整个线程。

async 中指定了新的调度程序时,它会将协程(和函数block)的执行转移到Dispatchers.IO 线程池中的新线程。

在这种情况下,代码块应包含 RestTemplate 调用(单个调用)。据我所知,这可以防止 RestTemplate 阻塞原始上下文。

您为什么要使用这种方法?

如果您一直在大型项目中使用 RestTemplate(每个请求线程模型),那么仅将其替换为 WebClient 等非阻塞客户端可能是一项艰巨的任务。有了这个,您可以继续使用您的大部分代码,只需在您可以异步进行多次调用的代码区域添加myRestTemplateAsync

如果您要开始一个新项目,请不要使用RestTemplate。相反最好使用WebFlux with coroutines in Kotlin as explained in this article

这是个好主意吗?

目前,我没有足够的信息来说明一种或另一种方式。我希望进行更广泛的测试和评估:

  • 负载下的内存消耗
  • 负载下线程池可能耗尽
  • 异常是如何传播和处理的

如果您对为什么这可能是一个好主意或可能不是一个好主意有任何意见,请在下面发布。

【讨论】:

    【解决方案2】:

    该代码是并行工作的:

        @GetMapping
        fun test(): Message {
            val restTemplate = RestTemplate()
            return runBlocking {
                val hello = async { hello(restTemplate) }
                val world = async { world(restTemplate) }
                Message("${hello.await()} ${world.await()}!")
            }
        }
    
        private suspend fun world(restTemplate: RestTemplate): String {
            logger.info("Getting WORLD")
            return withContext(Dispatchers.IO) {
                restTemplate.getForEntity("http://localhost:9090/world", World::class.java).body!!.payload
            }
        }
    
        private suspend fun hello(restTemplate: RestTemplate): String {
            logger.info("Getting HELLO")
            return withContext(Dispatchers.IO) {
                restTemplate.getForEntity("http://localhost:9090/hello", Hello::class.java).body!!.payload
            }
        }
    

    【讨论】:

    • 该代码并行运行。但我发现我无法让 restTemplate 同时获得两个响应。
    【解决方案3】:

    也许根本原因是:

    • restTemplate 使用 java.io(不是 java.nio)
    • restTemplate 阻止当前线程,直到它得到 HTTP 响应
    • 协程魔法在这种情况下不起作用

    解决方案:

    • 使用使用 java.nio 的 http-client

    【讨论】:

      【解决方案4】:
      • runBlocking:它旨在将常规阻塞代码连接到以挂起样式编写的库,用于主要功能和测试。

      • 这里我们使用coroutineScope 方法创建一个CoroutineScope。这个函数是为并行分解工作而设计的。当此范围内的任何子协程失败时,此范围将失败,并且所有其余子协程都将被取消。

      • 因为coroutineScope是挂起函数,所以我们将fun test()标记为suspend fun(挂起函数只允许从协程或其他挂起函数中调用)。通过使用CoroutineScope对象,我们可以调用asynclaunch来启动协程

        @GetMapping
        suspend fun test(): Message {
              val restTemplate = RestTemplate()
              return coroutineScope {
                  val hello = async { hello(restTemplate) }
                  val world = async { world(restTemplate) }
                  Message("${hello.await()} ${world.await()}!")
              }
          }
      

      【讨论】:

      • 也许你可以添加一些文字来解释为什么这样有效?有代码固然很好,但对于可能还不知道的人来说,这并不是不言而喻的。
      • Spring 框架不允许对休息控制器使用挂起功能(开箱即用)
      • 实现(“org.springframework.boot:spring-boot-starter-webflux”)
      • 基本上 spring-boot-starter-webflux 带来了对挂起函数的支持。
      猜你喜欢
      • 2022-01-18
      • 1970-01-01
      • 2019-02-21
      • 2022-01-04
      • 2016-03-13
      • 2021-12-12
      • 1970-01-01
      • 2011-04-03
      • 1970-01-01
      相关资源
      最近更新 更多