【问题标题】:Kotlin Flow and Websockets with clean architecture on Android在 Android 上具有干净架构的 Kotlin Flow 和 Websockets
【发布时间】:2021-12-24 23:14:47
【问题描述】:

最近,我们的团队尝试实现 websockets。我们很容易想到在监听事件时使用 Rx,但我想知道如何不使用它。所以,我们尝试了著名的 Kotlin Flow,但我不知道我们的实现是否正确。

我们的应用架构分为四层:

  • 服务 - 从套接字发出和接收事件,
  • 存储库 - 过滤器、映射、转换等,
  • ViewModel - 填充 LiveDatas
  • Activity - 观察变化并更新 UI。

因此,我们监听接收到 Service 的事件如下:

fun listenMessages(): Flow<List<Message>> = channelFlow {
    socket.on("NewMessage") { args ->
        val message = gson.fromJson(args[0].toString(), ...)
        trySend(message)
    }
    awaitClose()
}

当使用trySend 接收到事件时,我们使用channelFlow 的协程发送给消费者,并且我们使用awaitClose 保持这个Flow 处于活动状态。

Repository 在捕获 Flow 后执行一些逻辑并将其发送回 ViewModel:

fun getMessages(): Flow<List<Message>> {
    return service.listenMessages()
            .filter { ... }
            .map { ... }
}

然后,ViewModel 启动协程并在收集Flow 时更新 LiveData:

fun getMessages() {
    viewModelScope.launch(context = Dispatcher.IO) {
        repository.getMessages()
                .collect {
                    messagesLiveData.postValue(it)
                }
    }
}

这很好用,但是这引发了一些问题:

  • 这是正确的实现吗?
  • 当我们需要不断聆听时,channelFlow 是不是正确的选择?
  • 在这种情况下,我们是否应该使用经典的 Channels 而不是 Flowhot vs cold)?

提前感谢您的建议。

【问题讨论】:

    标签: android kotlin websocket kotlin-coroutines kotlin-flow


    【解决方案1】:

    通道的概念是作为主要在协程1之间进行通信的原语。据我了解它基于channelFlowdocs,它在引擎盖下使用Channel 并将其转换为Flow。通过在后台使用Channel,需要意识到一些重要的事情:

    发送到通道的每个值都会被接收一次。您不能使用通道以允许多个订阅者独立接收和响应它们的方式分发事件或状态更新。 1

    根据您的架构,这可能无关紧要,但这个例子可能说明了一些重要的事情:

    fun ws(): Flow<List<Message>> = channelFlow {
        println("channelFlow block called")
        trySend(listOf(Message(0), Message(1)))
        delay(2_000)
        trySend(listOf(Message(2)))
    }
    
    fun main() = runBlocking {
        val source = ws()
    
        launch {
            source.collect {
                println("First collect got $it")
            }
        }
        launch {
            source.collect {
                println("Second collect got $it")
            }
        }
    }
    

    生成输出:

    channelFlow block called
    channelFlow block called
    First collect got [Message(id=0), Message(id=1)]
    Second collect got [Message(id=0), Message(id=1)]
    First collect got [Message(id=2)]
    Second collect got [Message(id=2)]
    

    由于无法共享Channel,因此每次在source 上调用.collect,都会触发调用channelFlow 主体!更令人惊讶的是,同样的Flowsource 引用,并没有对ws() 进行新的调用。

    仔细查看channelFlow 文档,您会发现

    结果流是冷的,这意味着每次将终端运算符应用于结果流时都会调用块。2

    由于.collect() 是终端操作员,它会触发对channelFlow 主体的新调用。

    现在,这对您的用例重要吗?我不知道;您必须弄清楚您是否在多个位置订阅了listenMessages 生成的流,或者该channelFlow 的主体是否需要多次执行。

    作为更一般的建议,我建议更明确地说明您的服务行为。 listenMessages 是否应该发送给所有订阅的人?它是否应该只发射到第一个可用的(另见fan-out behavior)?如果您更喜欢前者,建议使用SharedFlow,如果后者我会明确地直接公开Channel,这样您就不会让下游消费者对我上面展示的问题感到困惑。

    【讨论】:

    • 阅读来自Roman Elizarov 的所有文章真的很有帮助。我重构了我们的代码以使用例如callbackFlow 而不是channelFlow 并进行了其他改进。感谢您为我指明正确的方向,我很感激。
    猜你喜欢
    • 2020-02-17
    • 1970-01-01
    • 2022-01-09
    • 1970-01-01
    • 2016-06-24
    • 1970-01-01
    • 1970-01-01
    • 2019-03-28
    • 2019-11-01
    相关资源
    最近更新 更多