【问题标题】:RxJava how to create Observable from a SubscriptionRxJava 如何从订阅创建 Observable
【发布时间】:2016-10-29 23:19:10
【问题描述】:

我正在寻找一种在处理subscribe 中的结果后创建 Observable 的方法。

鉴于我有来自 productRepo.list() 的 Observable,它是 Retrofit 返回 Observable<Response<ProductResponse>>

productRepo
    .list()
    .retry(3)
    .subscribe { response ->
        if (response.isSuccessful) {
            response.body().apply {
                cache.saveProducts(data)
            }
        }
    }

这样做的目的是将结果保存到本地 DB cache。这加上另一个非常相似的调用使用来自 API 的远程数据填充本地数据库。

两个调用完成后,我想从cache 加载数据。

我不想以任何方式将两者结合起来。只是想在之后运行一些任务。

我希望将此处理作为 Rx 调用图中的一个单元,以便它同时执行 Call1 和 Call2,并且一旦 Call1 和 Call2 完成运行 Task3。在这种情况下最好的方法是什么?如果每个呼叫的订阅者是分开的,我真的更喜欢。

flatMap 是这里的最佳选择吗?

【问题讨论】:

    标签: android rx-java kotlin


    【解决方案1】:
    .doOnNext()
    

    是您的答案,因为如果有多个,将返回您的最终响应或每个响应。试一试。

    【讨论】:

      【解决方案2】:

      看看Zip。执行类似 Observable.zip(firstObservable, secondObservable,.....{Task 3}

      【讨论】:

      • Zip 会将它们合并成一个流,这不是我想要的。
      • 我错了 Zip。我最终也使用了它。我和merge混淆了。我的错!
      【解决方案3】:

      正如你所说,

      我真的更喜欢每个通话的订阅者都是分开的。

      假设我们有两个 observables

      val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
      
      val call2 = Observable.from(arrayOf(2,4,6,8))
      

      如果我们像下面这样纯粹使用Observable.zip,那么两个Call1和Call2只能有单个订阅者。

      Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
      

      如果我们如下使用三个单独的订阅者,Call1 和 Call2 流将被触发两次

      call1.subscribe(call1Subscriber)
      
      call2.subscribe(call2Subscriber)
      
      Observable.zip(call1,call2) {c1, c2 -> Pair(c1,c2) }.subscribe(task3Subscriber)
      

      因此,我们需要使用.share().cacheWithInitialCapacity(1) 来做这些技巧

      val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
        .share()
        .cacheWithInitialCapacity(1)
      
      val call2 = Observable.from(arrayOf(2,4,6,8))
        .share()
        .cacheWithInitialCapacity(1)
      
      val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
        c1 + c2
      }
      call1.subscribe(call1Subscriber)
      call2.subscribe(call2Subscriber)
      task3Signal.subscribe(task3Subscriber)
      

      您还可以通过一个简单的测试用例来证明/测试您对 Rx 图的概念。

      class SimpleJUnitTest {
      
        @Test
        fun test(){
      
          val call1 = Observable.from(arrayOf(1,2,3,4,5,6,7,8))
            .doOnNext { println("call1 doOnNext $it") }
            .share()
            .cacheWithInitialCapacity(1)
      
          val call2 = Observable.from(arrayOf(2,4,6,8))
            .doOnNext { println("call2 doOnNext $it") }
            .share()
            .cacheWithInitialCapacity(1)
      
          val task3Signal = Observable.zip(call1,call2){ c1, c2 ->
            println("task3Signal c1:$c1, c2: $c2")
            c1 + c2
          }
      
          val testSubscriber1 = TestSubscriber<Int>()
          val testSubscriber2 = TestSubscriber<Int>()
          val testSubscriber3 = TestSubscriber<Int>()
          call1.subscribe(testSubscriber1)
          call2.subscribe(testSubscriber2)
          task3Signal.subscribe(testSubscriber3)
      
          testSubscriber1.assertReceivedOnNext(listOf(1,2,3,4,5,6,7,8))
          testSubscriber2.assertReceivedOnNext(listOf(2,4,6,8))
          testSubscriber3.assertReceivedOnNext(listOf(3,6,9,12))
          testSubscriber1.assertValueCount(8)
          testSubscriber2.assertValueCount(4)
          testSubscriber3.assertValueCount(4)
      
      
        }
      }
      

      输出:

      call1 doOnNext 1
      call1 doOnNext 2
      call1 doOnNext 3
      call1 doOnNext 4
      call1 doOnNext 5
      call1 doOnNext 6
      call1 doOnNext 7
      call1 doOnNext 8
      call2 doOnNext 2
      call2 doOnNext 4
      call2 doOnNext 6
      call2 doOnNext 8
      task3Signal c1:1, c2: 2
      task3Signal c1:2, c2: 4
      task3Signal c1:3, c2: 6
      task3Signal c1:4, c2: 8
      

      【讨论】:

      • 感谢您提供详尽的示例。感谢您的帮助。我最终使用doOnNext 进行缓存,然后将两个 Observable 压缩为一个。这使方法签名变得冗长,但它会做到。
      猜你喜欢
      • 1970-01-01
      • 2019-03-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多