【问题标题】:Flow<T>.distinctUntilChanged() is not workingFlow<T>.distinctUntilChanged() 不起作用
【发布时间】:2021-10-12 07:12:57
【问题描述】:

试图在 SharedFlow 处于活动状态(subscriptionCount 大于零)时运行协程,并在计数下降时取消。但不知何故,即使像distinctUntilChanged() 这样简单的东西也不能正常工作,我很困惑。

为此,我正在制作这样的“onActive”扩展:

fun <T : Any> MutableSharedFlow<T>.onActive(
    block: suspend CoroutineScope.() -> Unit
): Flow<T> {

    val original = this

    val isActiveFlow: Flow<Boolean> = subscriptionCount
        .map {
            println("Class: Count is $it")
            it > 0
        }
        .distinctUntilChanged()

    return isActiveFlow.flatMapLatest { isActive ->
        println("Class: isActive is $isActive")
        // here would be the code that calls `block`
        // but just this exactly as is, already triggers the error

        original // still emits the original flow, 
                 // that is needed or else subscriptionCount never changes
    }
}

这最初似乎可行,但是在其上运行添加多个订阅者的测试时,将连续多次打印“isActive is true”。为什么distinctUntilChanged() 不起作用?这种重复调用会与编辑区域中的其余逻辑发生冲突。

测试是这样的:

    @Test
    fun `onActive is called only once with multiple subscribers`() = runBlocking {

        val flow = MutableSharedFlow<Int>(
            replay = 2,
            onBufferOverflow = BufferOverflow.DROP_OLDEST
        ).apply {
            repeat(5) { tryEmit(it) }
        }.onActive {

        }

        val jobs = mutableListOf<Job>()
        repeat(3) { count ->
            jobs.add(flow.onEach {
                println("Test:  Listener $count received $it")
            }.launchIn(this))
        }
        delay(100)
        jobs.forEach { it.cancel() }
        jobs.forEach { it.join() }
    }

运行这个输出是:

Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4
Test:  Listener 1 received 3
Test:  Listener 1 received 4
Test:  Listener 2 received 3
Test:  Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test:  Listener 0 received 3
Test:  Listener 0 received 4

那么问题来了,为什么distinctUntilChanged() 不起作用,我该如何解决?

【问题讨论】:

  • 我在测试中没有看到任何distinctUntilChangedfoobar 呼叫,这是预期的吗?编辑:好的,您在声明中将其命名为 foobar,但实际方法名称为 onActive
  • 分享编辑部分实际上会很好,因为这可能会影响它的工作方式
  • 顺便问一下SharingStarted.WhileSubscribed 有没有看过?看起来你想要的东西可以通过使用常规流程更容易地编写出来,然后与shareIn(started = SharingStarted.WhileSubscribed)共享它。
  • @Joffrey 嗨。感谢您检查我的问题。是的,我有几个版本的方法,在某些时候只是 foobar。我将帖子编辑为正确称为onActive。调用distinctUntilChanged() 是在我尝试编写的方法中,而不是在测试中。我想编辑部分可以改变事情,但是,我实际上从这个版本的方法中删除了以隔离问题。目前flatMapLatest 只是 println 并返回原始流程。
  • 嗨@Joffrey,从我看到的SharingStarted.WhileSubscribed 在调用shareIn 时使用,但我需要的是block: suspend CoroutineScope.() -&gt; Unit 在isActive 为真时执行并且在isActive 时取消CoroutineScope是假的。我不知道如何将其与SharingStarted.WhileSubscribed 联系起来

标签: kotlin kotlin-coroutines kotlin-flow


【解决方案1】:

distinctUntilChanged 而言,您看到的行为似乎实际上是正确的:

  • 第一个注册订阅者收集原始 2 个重放元素,起始值为 isActive=false
  • 然后isActive 因为第一次订阅而变为真,因此第一个订阅者由于flatMapLatest 重新收集原始流,从而再次获得重放的元素
  • 其他 2 个订阅者在 subscriptionCount 已经为非 0 时到达,因此 isActive 对他们保持真实,直到他们被取消

如果您“在有订阅者时”启动的协程是为了在SharedFlow 中生成元素,我宁愿最初定义像channelFlow/callbackFlow 这样的流程,然后使用shareIn 和@ 987654331@ 有这种“有订阅者时运行”的行为。

如果它“只是在旁边”,您可能需要一个外部作用域并单独启动一个协程来监听 sharedFlow.subscribersCount 并启动/停止“sidecar”协程。

【讨论】:

  • 根据您的解释,map/distinctUntilChanged/flatMapLatest 正在发生。对于每个订阅,而不仅仅是在调用 onActive 方法时一次。我确信对此有一个很好的解释,但肯定是有些令人惊讶的行为。
  • 仅供参考,这里的最终最终目标是拥有一个while(true) { emit(refresh()); delay(duration) },以便在有订阅者时不断刷新价值。我尝试的路径(在这里失败)是有一个 isActiveFlow,用一个新的 CoroutineScope 调用 block(),当isActive==false 时我可以取消它。如果有办法在while循环中检查hasSubscribers,可能可以使用标准流工厂flow { 做一些事情。我会回到绘图板。谢谢。
  • @Budius 关于您的第一条评论,这确实是Flow 的一般工作方式。唯一的“共享”部分是SharedFlow 本身(或者当您以这种方式创建SharedFlow 时,直到shareIn 运算符的所有内容)。否则,所有运算符都独立应用于所有收集器。
  • @Budius 如果通过“刷新值”您指的是共享流中的值,那么为什么不使用此内置刷新循环将其构建为 channelFlowcallbackFlow然后使用shareIn?我认为侧协程不是向流发送值
  • 可能可以使用标准 Flow 工厂流程{如果有办法在 while 循环中检查 hasSubscribers - 这正是 shareIn(started = SharingStarted.WhileSubscribed) 会做的事情。你不用担心手动检查hasSubscribers,但是初始的flow { }内容在下降到0时会被取消并重新启动
猜你喜欢
  • 2018-12-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-29
  • 1970-01-01
  • 2021-10-31
  • 1970-01-01
  • 2016-11-04
相关资源
最近更新 更多