【发布时间】:2021-10-12 07:12:57
【问题描述】:
我试图在 SharedFlow 处于活动状态(subscriptionCount 大于零)时运行协程,并在计数下降时取消。但不知何故,即使像distinctUntilChanged() 这样简单的东西也不能正常工作,我很困惑。
为此,我正在制作这样的“onActive”扩展:
fun <T : Any> MutableSharedFlow<T>.onActive(
block: suspend CoroutineScope.() -> Unit
): Flow<T> {
val original = this
val isActiveFlow: Flow<Boolean> = subscriptionCount
.map {
println("Class: Count is $it")
it > 0
}
.distinctUntilChanged()
return isActiveFlow.flatMapLatest { isActive ->
println("Class: isActive is $isActive")
// here would be the code that calls `block`
// but just this exactly as is, already triggers the error
original // still emits the original flow,
// that is needed or else subscriptionCount never changes
}
}
这最初似乎可行,但是在其上运行添加多个订阅者的测试时,将连续多次打印“isActive is true”。为什么distinctUntilChanged() 不起作用?这种重复调用会与编辑区域中的其余逻辑发生冲突。
测试是这样的:
@Test
fun `onActive is called only once with multiple subscribers`() = runBlocking {
val flow = MutableSharedFlow<Int>(
replay = 2,
onBufferOverflow = BufferOverflow.DROP_OLDEST
).apply {
repeat(5) { tryEmit(it) }
}.onActive {
}
val jobs = mutableListOf<Job>()
repeat(3) { count ->
jobs.add(flow.onEach {
println("Test: Listener $count received $it")
}.launchIn(this))
}
delay(100)
jobs.forEach { it.cancel() }
jobs.forEach { it.join() }
}
运行这个输出是:
Class: Count is 0
Class: isActive is false
Class: Count is 1
Class: Count is 1
Class: isActive is true
Class: Count is 2
Class: Count is 2
Class: isActive is true
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
Test: Listener 1 received 3
Test: Listener 1 received 4
Test: Listener 2 received 3
Test: Listener 2 received 4
Class: Count is 2
Class: isActive is true
Class: Count is 3
Class: Count is 3
Class: Count is 3
Test: Listener 0 received 3
Test: Listener 0 received 4
那么问题来了,为什么distinctUntilChanged() 不起作用,我该如何解决?
【问题讨论】:
-
我在测试中没有看到任何
distinctUntilChanged或foobar呼叫,这是预期的吗?编辑:好的,您在声明中将其命名为foobar,但实际方法名称为onActive -
分享编辑部分实际上会很好,因为这可能会影响它的工作方式
-
顺便问一下
SharingStarted.WhileSubscribed有没有看过?看起来你想要的东西可以通过使用常规流程更容易地编写出来,然后与shareIn(started = SharingStarted.WhileSubscribed)共享它。 -
@Joffrey 嗨。感谢您检查我的问题。是的,我有几个版本的方法,在某些时候只是 foobar。我将帖子编辑为正确称为
onActive。调用distinctUntilChanged()是在我尝试编写的方法中,而不是在测试中。我想编辑部分可以改变事情,但是,我实际上从这个版本的方法中删除了以隔离问题。目前flatMapLatest只是 println 并返回原始流程。 -
嗨@Joffrey,从我看到的
SharingStarted.WhileSubscribed在调用shareIn时使用,但我需要的是block: suspend CoroutineScope.() -> Unit在isActive 为真时执行并且在isActive 时取消CoroutineScope是假的。我不知道如何将其与SharingStarted.WhileSubscribed联系起来
标签: kotlin kotlin-coroutines kotlin-flow