【问题标题】:RxJava; How to emit observables synchronouslyRxJava;如何同步发射 observables
【发布时间】:2016-01-28 13:11:33
【问题描述】:

我想同步发射两个 Observable 对象(它们是异步的),一个接一个地返回 first 发射的 Observable 对象。如果第一个失败,它不应该发出第二个。

假设我们有一个 Observable 让用户登录,另一个 Observable 在登录后自动选择用户的帐户。

这是我尝试过的:

public Observable<AccessToken> signInAndSelectAccount(String username, String password)
{

    Observable<AccessToken> ob1 = ...; // Sign in.
    Observable<Account> ob2 = ...; // Select account.


    return Observable.zip(
            ob1,
            ob2,
            new Func2<AccessToken, Account, AccessToken>() {
                @Override
                public AccessToken call(AccessToken accessToken, Account account)
                {
                     return accessToken;
                }
            });
}

不幸的是,这不适用于我的用例。它将并行发出/调用两个可观察对象,从 'ob1' 开始。

有人遇到过类似的用例吗?或者有一个想法如何让 observables 以同步的方式互相等待,第一个发出的可以返回?

提前致谢。

【问题讨论】:

    标签: java rx-java


    【解决方案1】:

    你也可以使用 rx.observables.BlockingObservable 例如:

    BlockingObservable.from(/**/).single();
    

    【讨论】:

    • BlockingSingle.from(/**/) 也是一个选项,专门用于Single
    • 但是如何处理onError?
    【解决方案2】:

    您可以使用Single.blockingGet进行同步调用

    // example 
    signIn(name,password).blockingGet() 
    

    【讨论】:

      【解决方案3】:

      反应式编程中没有“等待”这样的术语。您需要考虑创建一个数据流,其中一个Observable 可以由另一个触发。在您收到token 后,您需要收到account。它可能看起来像这样:

      Observable<Account> accountObservable = Observable.create(new Observable.OnSubscribe<AccessToken>() {
          @Override public void call(Subscriber<? super AccessToken> subscriber) {
              subscriber.onNext(new AccessToken());
              subscriber.onCompleted();
          }
      }).flatMap(accessToken -> Observable.create(new Observable.OnSubscribe<Account>() {
          @Override public void call(Subscriber<? super Account> subscriber) {
              subscriber.onNext(new Account(accessToken));
              subscriber.onCompleted();
          }
      }));
      

      【讨论】:

      • 这是不正确的。虽然在大多数情况下确实应该避免阻塞调用,但 RxJava 1 提供了 BlockingObservable,而 RxJava 2 提供了几个 .blocking*() 方法来与非响应式遗留代码交互。
      【解决方案4】:

      我不懂Java,但Scala中的解决方案可能是这样的,希望你能读懂

      import rx.lang.scala.Observable
      
      class AccessToken
      class Account
      
      case class TokenAndAccount(token: AccessToken, account: Account)
      
      val accessTokenSource = Observable.just(new AccessToken)
      val accountSource = Observable.just(new Account)
      
      accessTokenSource
         .flatMap(token ⇒ accountSource.map(account ⇒ TokenAndAccount(token, account)))
         .subscribe(tokenAndAccount ⇒ println(tokenAndAccount))
      

      基本上flatMap 将确保accountSource.map... 仅在来自accessTokenSource 的令牌发出后使用。在accountSource.map内部,我们将获取到的token和account结合在一起,供以后在subscribe使用。

      flatMap 是最有用的运算符之一,请务必阅读它的文档,也许还有一些教程。

      【讨论】:

        【解决方案5】:

        另外,我们可以在Observable 中使用.toBlocking() 方法,它会变成BlockingObservable。即,

        Observable.just().toBlocking();

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2019-08-13
          • 1970-01-01
          • 1970-01-01
          • 2017-09-07
          • 1970-01-01
          • 2018-10-16
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多