【问题标题】:RxJava2 flatMap create duplicate eventsRxJava2 flatMap 创建重复事件
【发布时间】:2017-02-17 18:19:41
【问题描述】:

我在 RxJava2 上相对较新,我遇到了一些奇怪的行为,所以我很可能以错误的方式使用该工具。

这是一个相当大的项目,但我已将下面的 sn-p 分离为最小可重现代码:

Observable
  .interval(333, TimeUnit.MILLISECONDS)
  .flatMap(new Function<Long, ObservableSource<Integer>>() {
    private Subject<Integer> s = PublishSubject.create();
    private int val = 0;

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
      val++;
      s.onNext(val);
      return s;
      }
    })
  .subscribe(new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
      Log.w("value: %s", integer);
     }
  });

此代码使用.interval 模拟来自我的rx 流的事件,flatMap 接收这些事件“进行一些处理”并使用Subject 将结果推送到流中。

流是一个持续的过程,将有几个事件。

这个最小代码很愚蠢,因为我只推送 apply 回调,但在实际情况下,有几个可能的时刻可能会发生推送,并且在 apply 期间接收到的事件数量不是将通过主题发送的相同金额。

我希望通过这段代码看到的是:

value: 2  // 1 got skipped because onNext is called before there's a subscriber.
value: 3
value: 4
value: 5
value: 6 ... etc

我实际得到的是:

value: 2
value: 3
value: 3 // 3 twice
value: 4
value: 4
value: 4 // 4 repeated 3 times
value: 5
value: 5
value: 5
value: 5 // 5 repeated 4 times
value: 6
value: 6
value: 6
value: 6
value: 6 // 6 repeated 5 times
 ... etc

我也试过有一个Observable&lt;Integer&gt; o = s.share(); 并返回它,或者直接返回s.share(); 得到相同的结果。

我有点理解为什么会这样。 ObservableSource 再次被订阅 n 再次 n ,因此每个循环都有更多事件。

问题:

如何实现我的预期行为?

(如果我的预期行为不清楚,请在 cmets 上询问更多信息)

【问题讨论】:

  • 如果我在你身边,我会尝试将 private Subject&lt;Integer&gt; s = PublishSubject.create(); 移动到不同的范围
  • @Blackbelt 正如我所说。这是最小的可重现代码。在完整的代码中,Function 有自己的类。
  • 我可以判断我看到的内容,我猜不出你写的内容。

标签: java android rx-java rx-java2


【解决方案1】:

您的PublishSubject 订阅了多次,每个项目从interval() 订阅一次。

编辑:您每次都需要传入一个新的PublishSubject(如果您想保留第一个/最后一个发射,请切换到BehaviorSubject);将其传递给长时间运行的进程,并确保在长时间运行的进程完成时正确调用其onComplete

【讨论】:

  • 我已经说过了。我的问题是如何实现我需要的行为?
  • 已编辑的答案和建议。
  • 另外,我认为使用 PublishSubject 有点麻烦。考虑将您的长期运行流程与 Rx 更紧密地集成。
  • 您考虑过使用switchMap 运算符吗?或者只是拥有一个可在整个应用程序中访问的单例 Subject
  • 谢谢。 switchMap 为我指明了正确的方向。检查我的答案以了解我使用的完整解决方案。
【解决方案2】:

编辑

在最近的 cmets 之后,我可以想出这种解决方案:

class MyBluetoothClient {
  private PublishSubject<BTLEEvent> statusPublishSubject = PublishSubject.create()

  public Observable<BTLEEvent> getEventObservable() {
    return statusPublishSubject
  }

  private void publishEvent(BTLEEvent event) {
    statusPublishSubject.onNext(event)
  }

  public void doStuff1() {
    // do something that returns:
    publishEvent(BTLEEvent.someEvent1)
  }

  public void doStuff2() {
    // do something else that eventually yields
    publishEvent(BTLEEvent.someEvent2)
  }
}

而你以这种方式使用它:

MyBluetoothClient client = MyBluetoothClient()
client
  .getEventObservable()
  .subscribe( /* */ )

///

client.doStuff1()

/// 

client.doStuff2

原答案

这样可以吗?

Observable
  .interval(333, TimeUnit.MILLISECONDS)
  .flatMap(new Function<Long, ObservableSource<Integer>>() {
    private int val = 0;

    @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
      val++;
      return Observable.just(val);
      }
    })
  .subscribe(new Consumer<Integer>() {
    @Override public void accept(Integer integer) throws Exception {
      Log.w("value: %s", integer);
     }
  });

【讨论】:

  • 感谢您的回答。对于这个非常小的示例代码,我会编写它。但是如果您阅读完整的问题,主题可以根据其他条件随时触发新事件。我早上回到办公室的第一件事就是尝试 Tassos 建议使用 switchMap 来代替
  • 我明白了。我很难像你这样给出一个更好的答案。请注意,您可以返回任何类型的 Observable,例如 Observable.range(0, 5).map(...).take(4) ... 等等。再给我一些东西,我会尽力想出更好的答案;)
  • 我有一个更具体的用例(来自其他几个):监听蓝牙 LE 扫描,解析广告帧(地图),过滤预期帧,以及(在这个问题上)将读数分成“ enter" 事件,"update rssi" 事件并使用超时来生成"exit" 事件。因此,该类在每个事件到达 Map 时保持它们的时间戳,并每秒运行一次检查以超时“退出”设备。
  • 这意味着这个Observable 将从任何可能的扫描事件中从任意数量的可能 BLE 设备生成 3 种偶数类型(枚举进入、更新、退出)。明天在办公室的第一件事,我将尝试使用.switchMap
  • 我更新了答案,希望这更接近真相
【解决方案3】:

这就是我想出的答案。我会将@Tassos 的答案标记为正确,因为他指出我在正确的道路上。

首先我需要一个CachedSubject(一个在没有观察者时缓存项目并在观察者连接后立即调度它们的主题),这是确保来自apply 内部的排放真正通过的必要条件。该类主要包含PublishSubject

class CachedSubject<T> extends Subject<T> {

        private PublishSubject<T> publishSubject = PublishSubject.create();
        private Queue<T> cache = new ConcurrentLinkedQueue<>();

        @Override public boolean hasObservers() {
            return publishSubject.hasObservers();
        }

        @Override public boolean hasThrowable() {
            return publishSubject.hasThrowable();
        }

        @Override public boolean hasComplete() {
            return publishSubject.hasComplete();
        }

        @Override public Throwable getThrowable() {
            return publishSubject.getThrowable();
        }

        @Override protected void subscribeActual(Observer<? super T> observer) {
            while (cache.size() > 0) {
                observer.onNext(cache.remove());
            }
            publishSubject.subscribeActual(observer);
        }

        @Override public void onSubscribe(Disposable d) {
            publishSubject.onSubscribe(d);
        }

        @Override public void onNext(T t) {
            if (hasObservers()) {
                publishSubject.onNext(t);
            } else {
                cache.add(t);
            }
        }

        @Override public void onError(Throwable e) {
            publishSubject.onError(e);
        }

        @Override public void onComplete() {
            publishSubject.onComplete();
        }
    }

然后我将这个类与switchMap 一起使用:

Observable
   .interval(1000, TimeUnit.MILLISECONDS)
   .switchMap(new Function<Long, ObservableSource<Integer>>() {

      private Subject<Integer> s = new CachedSubject<>();
      private int val = 0;

      @Override public ObservableSource<Integer> apply(Long aLong) throws Exception {
         val++;
         s.onNext(val);
         return s;
      }
   })
   .subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
         Log.w("value: %s", integer);
      }
   });

这有效地允许我在 apply&lt;T t&gt; 方法上接收任意数量的事件,并且只有 1 个 Consumer 订阅它,从它接收所有事件。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-11-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-24
    • 1970-01-01
    • 2014-06-01
    相关资源
    最近更新 更多