【问题标题】:PublishSubject with Kotlin coroutines (Flow)PublishSubject 与 Kotlin 协程(流程)
【发布时间】:2019-09-30 08:26:37
【问题描述】:

我使用了一个 PublishSubject,我正在向它发送消息,同时我也在监听结果。它完美无缺,但现在我不确定如何使用 Kotlin 的协程(流或通道)做同样的事情。

private val subject = PublishProcessor.create<Boolean>>()

...

fun someMethod(b: Boolean) {
    subject.onNext(b)
}

fun observe() {
    subject.debounce(500, TimeUnit.MILLISECONDS)
           .subscribe { /* value received */ }
}

由于我需要 debounce 运算符,我真的很想对流做同样的事情,所以我创建了一个通道,然后我尝试从该通道创建一个流并监听变化,但我没有得到任何结果。

private val channel = Channel<Boolean>()

...

fun someMethod(b: Boolean) {
    channel.send(b)
}

fun observe() {
    flow {
         channel.consumeEach { value ->
            emit(value)
         }
    }.debounce(500, TimeUnit.MILLISECONDS)
    .onEach {
        // value received
    }
}

怎么了?

【问题讨论】:

    标签: android kotlin rx-java kotlin-coroutines


    【解决方案1】:

    Flow 是冷异步流,就像Observable

    流上的所有转换,例如mapfilter 都不会触发流收集或执行,只有终端操作符(例如single)会触发它。

    onEach 方法只是一种转换。因此,您应该将其替换为终端流运算符collect。您也可以使用BroadcastChannel 来获得更简洁的代码:

    private val channel = BroadcastChannel<Boolean>(1)
    
    suspend fun someMethod(b: Boolean) {
        channel.send(b)
    }
    
    suspend fun observe() {
      channel
        .asFlow()
        .debounce(500)
        .collect {
            // value received
        }
    }
    

    更新:在提出问题时,debounce 有两个参数(如问题中所示)的过载。没有了。但是现在有一个以毫秒为单位的参数(长)。

    【讨论】:

    • 你能分享解决这个问题吗?
    • @CaferMertCeyhan mayyyo 无法订阅Flow。这是通过collect 而不是onEach 完成的。
    • @tynn 我更新了您关于 API 更改的答案。请修改以防您不喜欢它。 :)
    • 值得一提的是,BroadcastChannel 仍处于试验阶段。
    • BroadcastChannel 在 1.5 中已被弃用,取而代之的是 SharedFlowStateFlow
    【解决方案2】:

    对于PublishProcessor/PublishRelay,应该是SharedFlow/MutableSharedFlow

    private val _myFlow = MutableSharedFlow<Boolean>(
                              replay = 0,
                              extraBufferCapacity = 1, // you can increase      
                              BufferOverflow.DROP_OLDEST
    )
    val myFlow = _myFlow.asSharedFlow()
    
    
    // ...
    fun someMethod(b: Boolean) {
        _myFlow.tryEmit(b)
    }
    
    fun observe() {
        myFlow.debounce(500)
              .onEach {  }
              // flowOn(), catch{}
              .launchIn(coroutineScope)
    
    }
    

    还有StateFlow/MutableStateFlow 代表BehaviorProcessor/BehaviorRelay

    private val _myFlow = MutableStateFlow<Boolean>(false)
    val myFlow = _myFlow.asStateFlow()
    
    // ...
    fun someMethod(b: Boolean) {
        _myFlow.value = b // same as _myFlow.emit(v), myFlow.tryEmit(b)
    }
    
    fun observe() {
        myFlow.debounce(500)
              .onEach {  }
              // flowOn(), catch{}
              .launchIn(coroutineScope)
    
    }
    

    StateFlow 必须有初始值,如果你不想这样,这是解决方法:

    private val _myFlow = MutableStateFlow<Boolean?>(null)
    val myFlow = _myFlow.asStateFlow()
                        .filterNotNull()
    

    MutableStateFlow 在设置新值时使用.equals 比较,因此它不会一次又一次地发出相同的值(相对于使用引用比较的distinctUntilChanged)。

    所以MutableStateFlowBehaviorProcessor.distinctUntilChanged()。如果你想要精确的BehaviorProcessor 行为,那么你可以使用这个:

    private val _myFlow = MutableSharedFlow<Boolean>(
                                  replay = 1, 
                                  extraBufferCapacity = 0,
                                  BufferOverflow.DROP_OLDEST
    )
    
    

    【讨论】:

      【解决方案3】:

      Kotlin 协程中的 ArrayBroadcastChannel 与 PublishSubject 最相似。

      1. 和 PublishSubject 一样,一个 ArrayBroadcastChannel 可以有多个 订阅者和所有活动订阅者都会立即收到通知。
      2. 与 PublishSubject 一样,如果目前没有活动订阅者,推送到此频道的事件会丢失。

      与 PublishSubject 不同,背压内置于协程通道中,这就是缓冲容量的来源。这个数字实际上取决于通道用于哪个用例。对于大多数正常用例,我只选择 10 个,这应该绰绰有余。如果您将事件推送到此通道的速度比接收它的接收器快,您可以提供更多容量。

      【讨论】:

      • 如何使用它?你能提供一些初始化代码吗? Android Studio 无法解析 ArrayBroadcastChannel。
      • 仅针对所有发现此评论的人:ArrayBroadcastChannel 是一个内部类,但根据 javadoc:“此频道由 BroadcastChannel(capacity) 工厂函数调用创建。” - 所以你需要做的就是创建一个普通的 BroadcastChannel 并将容量传递给它的构造函数,比如 - BroadcastChannel(1)。
      • 感谢您的澄清。实际上,与普通的Channel 不同,在没有订阅者的情况下发送到BroadcastChannel 不会触发缓冲/背压。在文档中找不到这个。
      • 注意:此 API 已过时。当它变得稳定时,它将被弃用并被 SharedFlow 取代。 kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/…
      【解决方案4】:

      实际上 BroadcastChannel 已经过时了,Jetbrains 改变了他们的方法,改为使用 SharedFlows。这更清洁,更易于实施并解决了很多痛点。

      基本上,你可以实现同样的目标。

      class BroadcastEventBus {
          private val _events = MutableSharedFlow<Event>()
          val events = _events.asSharedFlow() // read-only public view
      
          suspend fun postEvent(event: Event) {
              _events.emit(event) // suspends until subscribers receive it
          }
      }
      

      要了解更多信息,请查看 Roman 的 Medium 文章。

      "Shared flows, broadcast channels" by Roman Elizarov

      【讨论】:

        猜你喜欢
        • 2021-06-18
        • 2020-04-12
        • 2021-01-04
        • 2019-08-19
        • 1970-01-01
        • 2021-01-18
        • 2019-07-17
        • 2019-06-22
        • 1970-01-01
        相关资源
        最近更新 更多