【问题标题】:RxJava 2 - Reduce queue in concatMapRxJava 2 - 在 concatMap 中减少队列
【发布时间】:2017-12-13 14:26:58
【问题描述】:

我有一个发出一些数据的主题。然后我想修改队列中的这个数据(长操作)。但是当另一个主题会发出其他东西时,我想减少我的队列

    PublishSubject<Integer> myBaseSubject = PublishSubject.create();
    PublishSubject<Integer> otherSubject = PublishSubject.create();

    myBaseSubject
            .concatMap(integer -> Observable.timer(5, TimeUnit.SECONDS) // long operation
                    .map(aLong -> integer)
            )
            .subscribe(
                    integer -> Log.i(TAG, "result: " + integer),
                    Functions.emptyConsumer()
            );

    myBaseSubject.onNext(1);
    myBaseSubject.onNext(2);
    myBaseSubject.onNext(3);
    myBaseSubject.onNext(4); 
    myBaseSubject.onNext(5);

    otherSubject.onNext(3);
    otherSubject.onNext(4);

    myBaseSubject.onNext(6);
    myBaseSubject.onNext(7);

如果concatMap 尚未使用这些元素(它们仍在队列中),现在我想修改我的一次性元素以跳过otherSubject(3 和4)发出的元素。

有可能实现吗?

PS:PublishSubject 不是必需的 - 它只是为了简单起见

【问题讨论】:

    标签: java android queue rx-java2 concatmap


    【解决方案1】:

    您可以使用ConcurrentHashMap 来跟踪哪些元素不应由concatMap 处理。我假设不处理某个项目的决定发生在该项目的索引创建之后。

    ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();
    
    myBaseSubject
        .doOnNext(v  -> map.put(v, v))
        .concatMap(integer -> {
            if (map.remove(integer) != null) {
                return Observable.timer(5, TimeUnit.SECONDS) // long operation
                    .map(aLong -> integer)
            }
            return Observable.empty();
        })
        .subscribe(
            integer -> Log.i(TAG, "result: " + integer),
            Functions.emptyConsumer()
        );
    
    myBaseSubject.onNext(1);
    myBaseSubject.onNext(2);
    myBaseSubject.onNext(3);
    myBaseSubject.onNext(4); 
    myBaseSubject.onNext(5);
    
    map.remove(3);
    map.remove(4);
    
    myBaseSubject.onNext(6);
    myBaseSubject.onNext(7);
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-10-19
      • 1970-01-01
      • 2017-12-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多