【问题标题】:RxJava async cache: proper way to dispose replay().autoConnect() ObservableRxJava 异步缓存:处理 replay().autoConnect() Observable 的正确方法
【发布时间】:2017-03-07 21:31:35
【问题描述】:

我必须为 observable 的结果提供一个短暂的缓存。

查看选项,我看到以下内容:

  1. 缓存replay(1).refCount(),当数据准备好时,缓存实际值。 缓存检索将检查实际数据并执行Observable.just 或返回 待处理的 Observable 或发起新的请求。

  2. 缓存 replay(1).autoConnect(1) 并始终返回

后者似乎更直接,但它有一个警告,当缓存必须失效时如何正确处理 observable。

有一个签名:

public Observable<T> autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)

但很难说我如何跟踪未完成的订阅以及 dispose 是否会是优雅的。

前者将负责资源释放,但您必须产生更复杂的逻辑。

【问题讨论】:

    标签: java rx-java observable


    【解决方案1】:

    为什么不.cache()

    public class CachedObservable<K,V> {
      private Function<K, Observable<V>> actual;
      private CachedObservable(Function<K, Observable<V>> actual){this.actual=actual;}
      private final Map<K, Observable<V>> cacheMap = new ConcurrentHashMap<>();
    
      public Observable<V> get(K key) {
        return cacheMap.computeIfAbsent(key, k -> this.actual.call(k).cache());
      }
      public void invalidate(K key){cacheMap.remove(key);}
    }
    

    【讨论】:

    • 缓存操作符在序列完成之前不会发出。另外我认为您也有同样的问题,您必须在知道活跃订阅数的情况下安全地处理它。
    • 缓存会在它们到来时发出它们 - 请参阅marble diagram,但它会立即订阅源 observable,并且当您有 0 个订阅者时不会取消订阅,因此不适合非 1大小的 Observables。但是你可以让它被垃圾收集。
    • 我测试了最新的 1.x、1.2.7,这就是我所看到的:看起来缓存立即开始发出项目,这不再是问题了。我可以提供大小提示,仍然不能从上游退订,所以根本不适合无限源。它可以正确地进行垃圾收集,它确实支持并发或稀疏订阅,它几乎满足了我的需求
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-09-04
    • 1970-01-01
    • 2011-08-25
    • 1970-01-01
    相关资源
    最近更新 更多