【发布时间】:2021-01-24 21:55:18
【问题描述】:
我正在尝试混合使用 Spring Kafka (2.5.6.RELEASE) 侦听器和 Kotlin 协程。详细点我有suspend fun:
suspend fun updatePrice(command: StockPriceUpdateCommand): Boolean
然后,我有一个 Spring Kafka Listener,每次从分区读取新消息时都必须调用该函数:
@KafkaListener(
id = "priceListener",
topics = [ "prices" ],
groupId = "prices",
properties = [
"key.deserializer=org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer"
]
)
fun listenToPrices(
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY) stock: String,
@Payload price: Double) {
useCase.updatePrice(StockPriceUpdateUseCase.StockPriceUpdateCommand(stock, price))
}
显然,编译器不允许我调用 updatePrice,因为错误“暂停函数 'updatePrice' 应该只能从协程或另一个暂停函数中调用”。 p>
在这种情况下哪种方法是正确的?
谢谢。
【问题讨论】:
-
请参考这个stackoverflow.com/questions/53928668/…。基本上,你需要启动一个协程并在其中调用
updatePrice。 -
谢谢你,@Sergey。但是,我在后端上下文中,而不是在 Android 应用程序中。有 Spring 代理该方法。我不知道声明协程可能会产生哪些副作用:)
-
是的,我注意到了,但是协程的概念是一样的:创建一个范围(或使用现有的);启动协程;需要时取消协程。
-
如果我在
suspend fun中使用@KafkaListener注释更改我的函数签名会怎样? -
我不熟悉 Kafka Listeners,但我猜你会收到与
listenToPrices函数相同的错误。
标签: kotlin spring-kafka kotlin-coroutines