【问题标题】:Using Kotlin Coroutines Together with Spring Kafka Listeners将 Kotlin 协程与 Spring Kafka 侦听器一起使用
【发布时间】:2021-01-24 21:55:18
【问题描述】:

我正在尝试混合使用 Spring Kafka (2.5.6.RELEASE) 侦听器和 Kotlin 协程。详细点我有suspend fun

suspend fun updatePrice(command: StockPriceUpdateCommand): Boolean

然后,我有一个 Spring Kafka Listener,每次从分区读取新消息时都必须调用该函数:

@KafkaListener(
    id = "priceListener",
    topics = [ "prices" ],
    groupId = "prices",
    properties = [
        "key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer"
    ]
 ) 
 fun listenToPrices(
    @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY) stock: String,
    @Payload price: Double) {

    useCase.updatePrice(StockPriceUpdateUseCase.StockPriceUpdateCommand(stock, price))
 }

显然,编译器不允许我调用 updatePrice,因为错误“暂停函数 'updatePrice' 应该只能从协程或另一个暂停函数中调用”。 p>

在这种情况下哪种方法是正确的?

谢谢。

【问题讨论】:

  • 请参考这个stackoverflow.com/questions/53928668/…。基本上,你需要启动一个协程并在其中调用updatePrice
  • 谢谢你,@Sergey。但是,我在后端上下文中,而不是在 Android 应用程序中。有 Spring 代理该方法。我不知道声明协程可能会产生哪些副作用:)
  • 是的,我注意到了,但是协程的概念是一样的:创建一个范围(或使用现有的);启动协程;需要时取消协程。
  • 如果我在suspend fun 中使用@KafkaListener 注释更改我的函数签名会怎样?
  • 我不熟悉 Kafka Listeners,但我猜你会收到与 listenToPrices 函数相同的错误。

标签: kotlin spring-kafka kotlin-coroutines


【解决方案1】:

查看关于@RabbitListener 的类似问题here

不清楚你在这里想要达到什么目的。

我的理解是挂起函数只能从协程中调用;由于 @RabbitListener 方法是由框架调用的,而不是用户代码,我们必须在框架和侦听器之间添加一个 shim - 但究竟如何执行任何有用的功能?

【讨论】:

  • 谢谢,加里。所以,到现在为止,我必须将侦听器调用的代码包含在runBlocking 函数中。谢谢。我会等待原生支持到suspend fun :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-02
  • 2019-08-25
  • 1970-01-01
  • 2018-03-07
  • 2017-08-17
  • 1970-01-01
相关资源
最近更新 更多