【问题标题】:RxJava2 combine Observables of different typesRxJava2 结合了不同类型的 Observable
【发布时间】:2018-08-08 15:35:30
【问题描述】:

我是一个 RxJava 新手。我已经掌握了窍门,但有一种情况我不太满意。假设我有一对旨在协同工作的不同类型的 Observable。例如:

val list: Observable<List<MyClass>>
val indexIntoList: Observable<Int>

所以,我需要将它们作为一对观察,以便观察者在 either list indexIntoList 获得更新时发出。现在我的解决方案是这样的:

var lastListUsed: List<MyClass> = ArrayList()
var lastIndexUsed = -1
val syncObject = Object()

init {
    list.subscribe(object: Observer<List<MyClass>>{
        .
        .
        .
        override fun onNext(t: List<MyClass>)
        {
             FunctionOnMyClass(t, lastIndexUsed)
        }
    })

    indexIntoList.subscribe(object: Observer<Int>{
        .
        .
        .
        override fun onNext(t: Int)
        {
            FunctionOnMyClass(lastListUsed, t)
        }
    })
}

fun FunctionOnMyClass(list: List<MyClass>, index: Int)
{
    synchronized(syncObject)
    {
        lastListUsed = list
        lastIndexUsed = index
        .
        .
        .
    }
}

我曾想过做这样的事情:

var lastMyClass = DefaultMyClass()

list.doOnNext{listVersion ->
    indexIntoMap.map{ index ->
          if(index in listVersion.indices)
              listVersion[index]
          else
              lastMyClass
     }.subscribe(object: Observer<MyClass>{
        .
        .
        .
        override fun onNext(t: MyClass)
        {
            lastMyClass = t
            .
            .
            .
        }
     })
}

但如果我理解正确,要使用这种方法,每次外部 Observable 更新时,我都必须在内部 Observable 上处理 Disposable。我宁愿不那样做。有处理这种情况的首选方法吗?我应该愿意在每次更新外部 Observable 时处理 Disposable 吗?

【问题讨论】:

  • 嘿,我对 Kotlin 不太熟悉,而且我也是 RxJava 的新手,但我之前确实使用过它来链接多个可观察对象,不确定这是否是您想要的,但您可以链接 observables 的输出是你想要的吗?
  • 如果你想配对它们,你可以使用this这样的zip运算符

标签: android kotlin rx-java2


【解决方案1】:

我认为您正在寻找combineLastest 运营商:

当两个 Observable 中的任何一个发出 item 时,通过指定的函数组合每个 Observable 发出的最新 item,并根据该函数的结果发出 item

这里是一个实现的例子:

val myListObservable = ...
val myIndexObservable = ...

disposable = Observable.combineLatest(myListObservable, myIndexObservable, BiFunction { list: List<YourType>, index: Int ->
    list to index
})

然后您可以直接使用最后一个列表和每个 observable 发出的 las 索引作为Pair&lt;List&lt;YourType&gt;,Int&gt;


如果您需要等待每个可观察对象发出一个元素以相互配对,您应该使用zip

通过指定函数将多个 Observable 的发射组合在一起,并根据该函数的结果为每个组合发射单个项目

正如您在图中看到的那样,第二个 observable 最后发出 CD。但是第一个 observable 需要一些时间来发射3,然后是4。结果对将等到另一个可观察对象发出要与之配对的元素。

实现同上,将combineLastest替换为zip

【讨论】:

  • 非常重要的是要注意,ZIP 将交替发出事件 A,B,A,B,A,B 如果在某个时间点有来自一个源的多个事件而没有来自另一个源的事件,ZIP将等待其他源发出事件。
  • 非常感谢!这很有帮助。
  • 还有一个快速的问题。如果我只想在其中一个值与上次不同时发出,该怎么办?
  • NVM,刚刚找到。 distinctUntilChanged()。明白了。
【解决方案2】:

如果您想收听两个可观察对象的最新事件,请查看combineLatest 运算符:http://rxmarbles.com/#combineLatest

每当其中一个可观察对象发出一个新值时它就会发出,但它会在每个源发出至少一个事件时立即启动。

val result = Observable.combineLatest&lt;String, Int, String&gt;(sourceObservable1, sourceObservable2) { s, integer -&gt; s + ": " + integer }

【讨论】:

  • 感谢您的回答。这很有帮助,但我选择了前一个,因为它的答案更详细、更完整。
猜你喜欢
  • 2018-07-22
  • 2018-12-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-01-06
  • 2018-10-09
  • 1970-01-01
相关资源
最近更新 更多