【问题标题】:Long polling with kotlin coroutines使用 kotlin 协程进行长轮询
【发布时间】:2022-01-18 10:22:00
【问题描述】:

我在 GitHub Long Polling Redis 找到了这个存储库

所以在 Spring Boot 中,我们可以使用 延迟请求 将客户端请求保持几秒钟(AppMsgController.java#L72

它会发送回客户端,直到 延迟请求 被结果填充 (AppMsgHandler.java#L74) 或直到超时。

我还注意到,这种机制也可以通过 CompetableFuture 在 java 中使用 completeOnTimeout 来实现。

但我想知道我们可以在 Kotlin Coroutines 中使用类似的东西吗?

【问题讨论】:

    标签: kotlin kotlin-coroutines completable-future long-polling


    【解决方案1】:

    正如@Spitzbueb 所说,您可以使用CompletableDeferred 做类似的事情。

    但是,如果您不需要支持 clear()count() 方法,您也可以通过将 ConcurrentHashMap 替换为从 redis 广播“ping”的简单 MutableSharedFlow<Unit> 来简化。

    onMessage 中,您可以将Unit 发送到可变共享流中以通知订阅者,然后您可以通过等待共享流上的第一个元素并发出readSubset 请求来简单地实现您的请求机制:

    class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {
    
        private val events = MutableSharedFlow<Unit>()
    
        suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
            val currentMsgs = appMsgRepo.readSubset(start)
            if (currentMsgs.isNotEmpty()) {
                return currentMsgs
            }
            val newMessages = withTimeoutOrNull(timeoutMillis) {
                events.first()
                appMsgRepo.readSubset(start)
            }
            return newMessages ?: emptyList()
        }
    
        override fun onMessage(message: Message, pattern: ByteArray?) {
            LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
            events.tryEmit(Unit)
        }
    
        companion object {
            private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
            private val UTF8: Charset = StandardCharsets.UTF_8
        }
    }
    

    然后控制器可以简单地调用requestMessages(前提是你让你的控制器在Spring WebFlux中使用suspend函数)。

    【讨论】:

    • 我尝试使用您的代码,但不知何故 events.tryEmit(Unit) 在等待 events.first() 时返回 false
    • 啊,也许你应该在状态流的构造函数中添加一个为 1 的extraBufferCapacity。我还没有查看它在没有暂停的情况下是如何工作的,但这可能是因为消费者协程(等待first())没有机会获取价值,因为事件的生产者没有暂停。因为没有缓冲区,生产者别无选择,只能放弃事件
    • coroutineScope.launch 包装onMessage 是个好主意,这样我就可以使用emit 而不是tryEmit 所以应该是这样的``` override fun onMessage(message: Message, pattern: ByteArray?) { coroutineScope.launch { events.emit(Unit) } } ```
    • 你的意思是我上面的建议没有用吗?你试过MutableSharedFlow&lt;Unit&gt;(extraBufferCapacity = 1)吗?如果这不起作用,那么在那里启动协程可能是个坏主意,因为onMessage 会立即返回,这意味着即使协程没有完成也可以再次调用它。如果您不解决实际问题,这可能会生成许多可能会卡住的协程。另一种方法是从onMessage 调用runBlocking 中的暂停emit(),所以至少你有背压
    【解决方案2】:

    在 Kotlin 协程中有 Deferred 类型,它与 CompletableFuture 类似,因为它表示一个尚不可用但可能会在未来出现的值(如果没有发生错误/抛出异常)。 @Joffrey 指出还有一个CompletableDeferred,它更接近于ComplatableFuture,使用户可以手动调用completeexceptionallyComplete

    可以使用CoroutineScope 上的async 扩展功能轻松创建延迟。如果您想设置超时,Kotlin 已经为您提供了 withTimeout 函数,该函数会在给定时间后取消代码块。

    请注意,withTimeout 应该在 async 内部,而不是相反。

    看看这个例子:https://pl.kotl.in/uYe12ds7g

    【讨论】:

    • 我相信CompletableDeferred 会更类似于CompletableFuture 而不是Deferred,因为您可以通过值或错误手动完成它
    猜你喜欢
    • 2017-05-15
    • 2018-11-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多