【发布时间】:2020-10-24 06:40:03
【问题描述】:
我第一次使用 Kotlin Coroutines 从事一个爱好项目。我已经阅读并观看了有关它的视频,我有点了解这个概念。但我遇到了一个问题。让我给你看看我的代码。
package com.dev.tuber.ingestion.snapshots
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import org.joda.time.LocalTime
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import kotlin.concurrent.fixedRateTimer
object SnapshotsBuffer {
private val buffer = ConcurrentHashMap<Int, MutableMap<Int, Queue<Snapshot>>>()
init {
for (minute in 0..59) {
buffer[minute] = mutableMapOf()
}
}
suspend fun start(snapshotsChannel: Channel<Snapshot>, composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
startComposing(composeSnapshots)
for (snapshot in snapshotsChannel) {
val currentMinute = getCurrentMinute()
if (!buffer[currentMinute]!!.containsKey(snapshot.pair.id)) {
buffer[currentMinute]!![snapshot.pair.id] = LinkedList()
}
buffer[currentMinute]!![snapshot.pair.id]!!.add(snapshot)
println(buffer)
}
}
private fun startComposing(composeSnapshots: Channel<MutableMap<Int, Queue<Snapshot>>>) {
val oneMinute = (1000 * 60).toLong()
fixedRateTimer("consuming", true, oneMinute, oneMinute) {
val previousMinute = getPreviousMinute()
composeSnapshots.send(buffer[previousMinute]!!) <---- cannot do this
buffer[getPreviousMinute()] = mutableMapOf()
}
}
private fun getCurrentMinute(): Int {
return LocalTime().minuteOfHour
}
private fun getPreviousMinute(): Int {
val currentMinute = getCurrentMinute()
if(currentMinute == 0) return 59
return currentMinute - 1
}
}
所以。我有两个频道。第一个频道是snapshotsChannel,这是Snapshot 将到达的地方。我想缓冲Snapshot 并且每当一分钟过去我想将缓冲区发送到composeSnapshots 通道以进行进一步处理。
基本上我得到了很多Snapshot,我不想直接将它们发送给进一步处理。所以这就是为什么我想每对每分钟缓冲一次。
问题出现在startComposing 函数中。 fixedRateTimer 不是可挂起的函数,所以我不能在这里使用发送函数。我现在有点卡住了,因为我找不到解决方案。我研究了 TickerChannel 和 Kotlin Flow,但这似乎不是解决我问题的正确方法。
你知道解决办法吗?
【问题讨论】:
标签: kotlin kotlin-coroutines kotlin-coroutine-channel