【问题标题】:Access ApplicationCall in object without propagation在不传播的情况下访问对象中的 ApplicationCall
【发布时间】:2020-09-02 13:27:23
【问题描述】:

Ktor 中是否有线程安全的方法可以静态访问当前的 ApplicationCall?我正在尝试使以下简单示例起作用;

object Main {

    fun start() {
        val server = embeddedServer(Jetty, 8081) {
            intercept(ApplicationCallPipeline.Call) {
                // START: this will be more dynamic in the future, we don't want to pass ApplicationCall
                Addon.processRequest() 
                // END: this will be more dynamic in the future, we don't want to pass ApplicationCall

                call.respondText(output, ContentType.Text.Html, HttpStatusCode.OK)
                return@intercept finish()
            }
        }
        server.start(wait = true)
    }
}

fun main(args: Array<String>) {
    Main.start();
}

object Addon {

    fun processRequest() {
        val call = RequestUtils.getCurrentApplicationCall()
        // processing of call.request.queryParameters
        // ...
    }
}

object RequestUtils {

    fun getCurrentApplicationCall(): ApplicationCall {
        // Here is where I am getting lost..
        return null
    }
}

我希望能够从 RequestUtils 中静态获取当前上下文的 ApplicationCall,以便我可以在任何地方访问有关请求的信息。这当然需要扩展以能够同时处理多个请求。

我对依赖注入和 ThreadLocal 做了一些实验,但没有成功。

【问题讨论】:

    标签: kotlin kotlin-coroutines ktor


    【解决方案1】:

    好吧,应用程序调用被传递给协程,因此尝试“静态”获取它真的很危险,因为所有请求都在并发上下文中处理。

    Kotlin 官方文档谈到了Thread-local in the context of coroutine executions。它使用 CoroutineContext 的概念来恢复特定/自定义协程上下文中的 Thread-Local 值。

    但是,如果您能够设计一个完全异步的 API,您将能够通过直接创建自定义 CoroutineContext 并嵌入请求调用来绕过线程局部变量。

    编辑:我更新了示例代码以测试 2 种风格:

    • async 端点:完全基于协程上下文和挂起函数的解决方案
    • 阻塞端点:使用线程本地来存储应用程序调用,如kotlin doc中所述。
    import io.ktor.server.engine.embeddedServer
    import io.ktor.server.jetty.Jetty
    import io.ktor.application.*
    import io.ktor.http.ContentType
    import io.ktor.http.HttpStatusCode
    import io.ktor.response.respondText
    import io.ktor.routing.get
    import io.ktor.routing.routing
    import kotlinx.coroutines.asContextElement
    import kotlinx.coroutines.launch
    import kotlin.coroutines.AbstractCoroutineContextElement
    import kotlin.coroutines.CoroutineContext
    import kotlin.coroutines.coroutineContext
    
    /**
     * Thread local in which you'll inject application call.
     */
    private val localCall : ThreadLocal<ApplicationCall> = ThreadLocal();
    
    object Main {
    
        fun start() {
            val server = embeddedServer(Jetty, 8081) {
                routing {
                    // Solution requiring full coroutine/ supendable execution.
                    get("/async") {
                        // Ktor will launch this block of code in a coroutine, so you can create a subroutine with
                        // an overloaded context providing needed information.
                        launch(coroutineContext + ApplicationCallContext(call)) {
                            PrintQuery.processAsync()
                        }
                    }
    
                    // Solution based on Thread-Local, not requiring suspending functions
                    get("/blocking") {
                        launch (coroutineContext + localCall.asContextElement(value = call)) {
                            PrintQuery.processBlocking()
                        }
                    }
                }
    
                intercept(ApplicationCallPipeline.ApplicationPhase.Call) {
                    call.respondText("Hé ho", ContentType.Text.Plain, HttpStatusCode.OK)
                }
            }
            server.start(wait = true)
        }
    }
    
    fun main() {
        Main.start();
    }
    
    interface AsyncAddon {
        /**
         * Asynchronicity propagates in order to properly access coroutine execution information
         */
        suspend fun processAsync();
    }
    
    interface BlockingAddon {
        fun processBlocking();
    }
    
    object PrintQuery : AsyncAddon, BlockingAddon {
        override suspend fun processAsync() = processRequest("async", fetchCurrentCallFromCoroutineContext())
    
        override fun processBlocking() = processRequest("blocking", fetchCurrentCallFromThreadLocal())
    
        private fun processRequest(prefix : String, call : ApplicationCall?) {
            println("$prefix -> Query parameter: ${call?.parameters?.get("q") ?: "NONE"}")
        }
    }
    
    /**
     * Custom coroutine context allow to provide information about request execution.
     */
    private class ApplicationCallContext(val call : ApplicationCall) : AbstractCoroutineContextElement(Key) {
        companion object Key : CoroutineContext.Key<ApplicationCallContext>
    }
    
    /**
     * This is your RequestUtils rewritten as a first-order function. It defines as asynchronous.
     * If not, you won't be able to access coroutineContext.
     */
    suspend fun fetchCurrentCallFromCoroutineContext(): ApplicationCall? {
        // Here is where I am getting lost..
        return coroutineContext.get(ApplicationCallContext.Key)?.call
    }
    
    fun fetchCurrentCallFromThreadLocal() : ApplicationCall? {
        return localCall.get()
    }
    

    您可以在导航器中对其进行测试:

    http://localhost:8081/blocking?q=test1

    http://localhost:8081/blocking?q=test2

    http://localhost:8081/async?q=test3

    服务器日志输出:

    blocking -> Query parameter: test1
    blocking -> Query parameter: test2
    async -> Query parameter: test3
    

    【讨论】:

      【解决方案2】:

      您要为此使用的关键机制是CoroutineContext。这是您可以设置键值对以用于任何子协程或挂起函数调用的地方。

      我会试着举一个例子。

      首先,让我们定义一个CoroutineContextElement,让我们将ApplicationCall 添加到CoroutineContext

      class ApplicationCallElement(var call: ApplicationCall?) : AbstractCoroutineContextElement(ApplicationCallElement) {
          companion object Key : CoroutineContext.Key<ApplicationCallElement>
      }
      

      现在我们可以定义一些助手,将ApplicationCall 添加到我们的一条路由上。 (这可以作为某种 Ktor 插件来监听管道,但我不想在这里增加太多噪音)。

      suspend fun PipelineContext<Unit, ApplicationCall>.withCall(
          bodyOfCall: suspend PipelineContext<Unit, ApplicationCall>.() -> Unit
      ) {
          val pipeline = this
          val appCallContext = buildAppCallContext(this.call)
          withContext(appCallContext) {
              pipeline.bodyOfCall()
          }
      }
      
      internal suspend fun buildAppCallContext(call: ApplicationCall): CoroutineContext {
          var context = coroutineContext
          val callElement = ApplicationCallElement(call)
          context = context.plus(callElement)
          return context
      }
      

      然后我们可以一起使用它,就像下面这个测试用例一样,我们能够从嵌套的挂起函数中获取调用:

      suspend fun getSomethingFromCall(): String {
          val call = coroutineContext[ApplicationCallElement.Key]?.call ?: throw Exception("Element not set")
          return call.parameters["key"] ?: throw Exception("Parameter not set")
      }
      
      
      fun Application.myApp() {
      
          routing {
              route("/foo") {
                  get {
                      withCall {
                          call.respondText(getSomethingFromCall())
                      }
                  }
              }
          }
      }
      
      class ApplicationCallTest {
      
          @Test
          fun `we can get the application call in a nested function`() {
              withTestApplication({ myApp() }) {
                  with(handleRequest(HttpMethod.Get, "/foo?key=bar")) {
                      assertEquals(HttpStatusCode.OK, response.status())
                      assertEquals("bar", response.content)
                  }
              }
          }
      
      }
      

      【讨论】:

      • 啊,大妈-我看到我的答案与@amanin 非常相似。哦,好吧,我把它留在这里,因为辅助函数可能会添加一些实现思路。
      猜你喜欢
      • 2018-11-28
      • 1970-01-01
      • 2023-03-31
      • 2018-01-12
      • 2015-01-18
      • 1970-01-01
      • 2019-01-19
      • 1970-01-01
      • 2011-07-04
      相关资源
      最近更新 更多