【发布时间】:2021-12-24 23:14:47
【问题描述】:
最近,我们的团队尝试实现 websockets。我们很容易想到在监听事件时使用 Rx,但我想知道如何不使用它。所以,我们尝试了著名的 Kotlin Flow,但我不知道我们的实现是否正确。
我们的应用架构分为四层:
- 服务 - 从套接字发出和接收事件,
- 存储库 - 过滤器、映射、转换等,
- ViewModel - 填充 LiveDatas
- Activity - 观察变化并更新 UI。
因此,我们监听接收到 Service 的事件如下:
fun listenMessages(): Flow<List<Message>> = channelFlow {
socket.on("NewMessage") { args ->
val message = gson.fromJson(args[0].toString(), ...)
trySend(message)
}
awaitClose()
}
当使用trySend 接收到事件时,我们使用channelFlow 的协程发送给消费者,并且我们使用awaitClose 保持这个Flow 处于活动状态。
Repository 在捕获 Flow 后执行一些逻辑并将其发送回 ViewModel:
fun getMessages(): Flow<List<Message>> {
return service.listenMessages()
.filter { ... }
.map { ... }
}
然后,ViewModel 启动协程并在收集Flow 时更新 LiveData:
fun getMessages() {
viewModelScope.launch(context = Dispatcher.IO) {
repository.getMessages()
.collect {
messagesLiveData.postValue(it)
}
}
}
这很好用,但是这引发了一些问题:
- 这是正确的实现吗?
- 当我们需要不断聆听时,
channelFlow是不是正确的选择? - 在这种情况下,我们是否应该使用经典的
Channels 而不是Flow(hot vs cold)?
提前感谢您的建议。
【问题讨论】:
标签: android kotlin websocket kotlin-coroutines kotlin-flow