【问题标题】:RxJava asynchronous observer misses elementsRxJava 异步观察者遗漏元素
【发布时间】:2015-05-17 17:16:57
【问题描述】:

我正在使用 observeOn 来观察另一个线程中的 observable:

Observable.just("Hello", "world!").observeOn(Schedulers.io()).subscribe(System.out::println);

但是,此代码不会始终输出“Hello world!”。 PublishSubject 也是如此:

PublishSubject<String> subject = PublishSubject.create();
subject.observeOn(Schedulers.io()).subscribe(System.out::println);
subject.onNext("Hello");
subject.onNext("world!");

为什么这段代码并不总是打印“Hello world!” ?我认为至少在第二个示例中,订阅会收到两条消息,因为它在 onNext 调用之前订阅。有没有办法接收这两条消息?

【问题讨论】:

  • 你真的要在这里使用 Schedulers.io() 吗?根据文档,它应该仅用于 IO 操作,例如文件读取或网络。
  • 从根本问题的角度来看,使用 io() 是无关紧要的。它用于阻塞操作,通常是 IO,但没有什么能阻止你对它们进行计算。

标签: rx-java


【解决方案1】:

RxJava 调度程序底层的线程都是守护线程,如果所有其他非守护线程都已完成,它们会被 JVM 停止。如果您从static void main() 运行您的示例,您的main() 方法很可能会在其他线程有机会运行之前终止,因此不会执行打印代码。

根据您想要观察序列的方式,您可以在 Observable 链上使用 toBlocking() 或在您的 onCompleted() 方法实现中使用 CountDownLatchcountDown()

【讨论】:

  • 谢谢,这确实是问题所在!我使用了 CountDownLatch 来防止这个问题。但是我不确定我是否理解 toBlocking 的语义。这是否会使调用线程阻塞,直到调用 onCompleted ?
  • BlockingObservable 上的方法在当前线程上执行并且本质上是阻塞的。一些穷举方法会一直阻塞,直到上游以 onComplete 或 onError 终止。
【解决方案2】:

IO 调度程序需要一些时间才能准备好。

当您使用 onNext 发出项目时,它还没有准备好,所以您错过了它。

【讨论】:

  • 这个答案有点误导:如果有足够的时间,他们将处理 onNext 而不会丢失数据,但示例程序在处理发生之前退出。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-06-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-04-28
  • 1970-01-01
相关资源
最近更新 更多