【问题标题】:How can I achieve this requirement using Rx Java如何使用 Rx Java 实现此要求
【发布时间】:2018-12-13 12:23:23
【问题描述】:

我有一个包含(Good、Non-Critical、Critical)值的 State(Enum)

所以要求是:

  1. 应在状态进入非关键状态时触发。
  2. 应在状态进入临界状态时触发。
  3. 应在状态保持临界状态 15 秒时触发。

输入:

publishSubject.onNext("Good")
publishSubject.onNext("Critcal") 
publishSubject.onNext("Critcal") 
publishSubject.onNext("NonCritical")  
publishSubject.onNext("Critacal") 
publishSubject.onNext("Critical") 
publishSubject.onNext("Good")
and so on...

参考代码结构:

    var publishSubject = PublishSubject.create<State>()
    publishSubject.onNext(stateObject)


    publishSubject
            /* Business Logic Required Here ?? */
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
                AppLogger.printLog("Trigger Success --> ")
            }

请帮忙, 提前致谢,

【问题讨论】:

  • 您能提供一些意见吗?
  • 输入就像这些随机值(Good、Non-Critical、Critical)一样,每秒都会出现。我们正在使用 publishSubject.onNext 传递这些值(每秒钟的随机状态值)。
  • @Michael publishSubject.onNext("Good") publishSubject.onNext("Critcal") publishSubject.onNext("Critcal") publishSubject.onNext("NonCritical") publishSubject.onNext("Critcal") publishSubject.onNext("Critical") publishSubject.onNext("Good") 查看类似的值。
  • @MichaelDodd,完成请检查
  • 好的,现在我将 15 分钟更改为 15 秒 (3) - 如果关键状态在 15 秒内相同,那么它应该触发。如果状态与最后一个已知状态不同,您是否只希望 (1) 和 (2) 触发? - 是的

标签: android rx-java rx-java2 rx-android rx-kotlin


【解决方案1】:

您可以使用distinctUntilChanged() 来抑制不改变状态的事件。使用filter()过滤掉正常事件。

当状态改变时,使用switchMap() 操作符创建一个新的订阅。当状态为“严重”时,使用interval() 运算符等待 15 秒。如果状态在那 15 秒内发生变化,switchMap() 将取消订阅并重新订阅一个新的 observable。

publishSubject
  .distinctUntilChanged()
  .subscribeOn(Schedulers.computation())
  .observeOn(AndroidSchedulers.mainThread())
  .filter( state -> state != State.Normal )
  .switchMap( state -> {
                   if (state == State.Critical) {
                     return Observable.interval(0, 15, TimeUnit.SECONDS) // Note 1
                        .map(v -> State.Critical); // Note 2
                   }
                   return Observable.just( State.Noncritical );
                 })
  .subscribe( ... );
  1. interval() 被赋予初始值0,使其立即发出一个值。在15 秒之后,将发出下一个值,依此类推。
  2. map() 运算符将interval() 发出的Long 变成

【讨论】:

  • 老实说,这是一个比我更好的答案。 TIL 关于distinctUntilChanged()switchMap()
  • :) 学习使用 switchMap() 是我对 RxJava 理解的重大突破。它管理状态、订阅和排序的能力非常强大。
  • 虽然这也会触发subscribe 的内容两次?即一旦状态更改为Critical,然后再过 15 秒?
  • 要求 (2) 说当它进入临界状态时触发,(3) 说当它保持临界状态 15 秒时触发。需求中有几个模棱两可的地方,所以 YMMV。
  • @BobDalgleish,非常好..,完全符合我的要求。谢谢
【解决方案2】:

您的要求的前两部分应合并为一个。您要求在NonCriticalCritical 事件上触发链,因此应该Good 事件触发链。同样,您只需要在状态与先前事件不同时触发事件。对于这两个.filter 事件就足够了:

var lastKnownState: State = null

publishSubject
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(this::checkStateDiffers)       // Check we have a new state
        .filter { state -> state != State.Good }        // Check event is good
        .subscribe {
            AppLogger.printLog("Trigger Success --> ")
        }

...

private fun checkStateDiffers(val state: State): Boolean {
     val isDifferent = state != lastKnownState
     if (isDifferent) lastKnownState = state       // Update known state if changed
     return isDifferent
}

超时要求有点棘手。 RxJava 的timeout() 运算符提供了在一段时间内没有收到任何新消息时发出错误的选项。但是,我假设您即使在收到超时后仍想继续侦听事件。同样,如果我们只是发送另一个Critical 事件,它将被第一个filter 丢弃。因此,在这种情况下,我建议使用第二个一次性设备,它只负责监听此超时。

Disposable timeoutDisp = publishSubject
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .timeout(15, TimeUnit.SECONDS)
        .onErrorResumeNext(State.Timeout)
        .filter { state -> state == State.Timeout }
        .filter { state -> lastKnownState == State.Critical }
        .subscribe {
            AppLogger.printLog("Timeout Success --> ")
        }

还要调整checkStateDiffers(),使其不在第一个链中保存Timeout 状态。

private fun checkStateDiffers(val state: State): Boolean {
     if (state == State.Timeout) return true

     var isDifferent = state != lastKnownState
     if (isDifferent) lastKnownState = state       // Update known state if changed
     return isDifferent
}

【讨论】:

  • 让我检查一下..,.
  • 附带说明,我刚刚意识到您正在 Kotlin 中执行此操作。这个答案是用 Java 编写的,但应该很容易转换。
  • 我已将答案更改为在 Kotlin 中,但是我对 Kotlin 的了解不是很好,因此可能存在一两个语法问题。它们应该很容易理解和纠正。
猜你喜欢
  • 2016-09-12
  • 1970-01-01
  • 1970-01-01
  • 2020-10-14
  • 1970-01-01
  • 2016-05-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多