【问题标题】:Cache handling with RXJava使用 RXJava 处理缓存
【发布时间】:2015-04-28 16:01:23
【问题描述】:

我正在尝试使用 rxJava 实现此工作流程,但我确定我是否在滥用或做错事情。

  • 用户要求登录
  • 如果 loginResult 在缓存中可用,则“发出”缓存的 LoginResult
  • 否则,如果一切都成功,则实际执行对 Web 服务的请求并缓存结果
  • 如果出现错误最多重试 3 次,如果有第 4 次则清除缓存。

这是我完整的 sn-p 代码。

public class LoginTask extends BaseBackground<LoginResult> {
  private static CachedLoginResult cachedLoginResult = new CachedLoginResult();
  private XMLRPCClient xmlrpcClient;
  private UserCredentialsHolder userCredentialsHolder;

  @Inject
  public LoginTask(XMLRPCClient client, UserCredentialsHolder userCredentialsHolder) {
    this.xmlrpcClient = client;
    this.userCredentialsHolder = userCredentialsHolder;
  }

  @Override
  public LoginResult performRequest() throws Exception {
    return UserApi.login(
        xmlrpcClient,
        userCredentialsHolder.getUserName(),
        userCredentialsHolder.getPlainPassword());


  }

  @Override
  public Observable<LoginResult> getObservable() {
    return cachedLoginResult.getObservable()
        .onErrorResumeNext(
            Observable.create(
                ((Observable.OnSubscribe<LoginResult>) subscriber -> {
                  try {
                    if (!subscriber.isUnsubscribed()) {
                      subscriber.onNext(performRequest()); // actually performRequest
                    }
                    subscriber.onCompleted();
                  } catch (Exception e) {
                    subscriber.onError(e);
                  }
                })
            )
                .doOnNext(cachedLoginResult::setLoginResult)
                .retry((attempts, t) -> attempts < 3)
                .doOnError(throwable -> cachedLoginResult.purgeCache())
        );
  }


  private static class CachedLoginResult {
    private LoginResult lr = null;
    private long when = 0;

    private CachedLoginResult() {
    }

    public boolean hasCache() {
      return lr != null && when + TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES) > System.currentTimeMillis();
    }

    public void setLoginResult(LoginResult lr) {
      if (lr != null) {
          this.lr = lr;
          this.when = System.currentTimeMillis();
      }
    }

    public void purgeCache() {
      this.lr = null;
      this.when = 0;
    }

    public Observable<LoginResult> getObservable() {
      return Observable.create(new Observable.OnSubscribe<LoginResult>() {
        @Override
        public void call(Subscriber<? super LoginResult> subscriber) {
          if (!subscriber.isUnsubscribed()) {
            if (hasCache()) {
              subscriber.onNext(lr);
              subscriber.onCompleted();
            } else {
              subscriber.onError(new RuntimeException("No cache"));
            }
          }
        }
      });
    }
  }
}

由于我无法找到任何类似的示例,并且我在 1 天前开始“玩” rxjava,因此我不确定我的实现。

感谢您的宝贵时间。

【问题讨论】:

    标签: java android design-patterns reactive-programming rx-java


    【解决方案1】:

    我认为这段代码没问题,干得好:)

    您在LoginTask 中使用Observable.create 是正确的,否则调用的结果可能会在内部缓存,然后retry 不会有太大帮助...

    我认为这对于CachedLoginResultObservable 来说是不必要的。在这里,您可以使用 Observable.justObservable.error 实用方法来简化代码,例如:

    public Observable<LoginResult> getObservable() {
      if (hasCache()) {
          return Observable.just(lr);
      } else {
          return Observable.error(new RuntimeException("No cache"));
      }
    }
    

    注意:just 存储您告诉它在内部发出的值,因此重新订阅将始终产生该值。这就是我上面所暗示的,例如,你不应该这样做 Observable.just(performRequest()).retry(3),因为 performRequest 只会被调用一次。

    【讨论】:

    • 你好西蒙,如果'错了,请纠正,但使用你的 .just 和 .error 会导致在可观察创建时发出值。如果我创建主 observable 并在缓存过期后使用它会发生什么?我想它只会返回 30 分钟的旧缓存,现在应该过期了吧?
    • 你是对的,只是会在你调用 getObservable() 的那一刻捕获缓存的值,所以 Observable.createObservable.defer 实际上是有意义的。也看看 Akarnokd 的回答 ;)
    【解决方案2】:

    如果我理解正确,您想执行一次登录并以反应方式缓存结果吗?如果是这样,这里有一个示例,我将如何做到这一点:

    import java.util.concurrent.ThreadLocalRandom;
    
    import rx.*;
    import rx.schedulers.Schedulers;
    import rx.subjects.AsyncSubject;
    
    
    public class CachingLogin {
        static class LoginResult {
    
        }
        /** Guarded by this. */
        AsyncSubject<LoginResult> cache;
        public Observable<LoginResult> login(String username, String password) {
            AsyncSubject<LoginResult> c;
            boolean doLogin = false;
            synchronized (this) {
                if (cache == null || cache.hasThrowable()) {
                    cache = AsyncSubject.create();
                    doLogin = true;
                }
                c = cache;
            }
            if (doLogin) {
                Observable.just(1).subscribeOn(Schedulers.io())
                .map(v -> loginAPI(username, password))
                .retry(3).subscribe(c);
            }
            return c;
        }
        public void purgeCache() {
            synchronized (this) {
                cache = null;
            }
        }
        static LoginResult loginAPI(String username, String password) {
            if (ThreadLocalRandom.current().nextDouble() < 0.3) {
                throw new RuntimeException("Failed");
            }
            return new LoginResult();
        }
    }
    

    【讨论】:

    • 您好 akarnokd,谢谢您的回答。它看起来像是在 rxjava 中做事的好方法 :) 如果我在调用“登录”方法时理解正确,它将立即触发 loginAPI()(如果不存在缓存)。虽然这在大多数情况下都是有意义的,但我想获得可触发真正 http 调用的 observable,以便我可以根据用例使用 observeOn 和 subscribeOn。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-03-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-27
    相关资源
    最近更新 更多