【问题标题】:Rx (Reactive Extensions for Java) Zip operator with time intervalRx (Reactive Extensions for Java) 带时间间隔的 Zip 运算符
【发布时间】:2015-05-01 19:47:16
【问题描述】:

我对 RxJava 比较陌生,我已经和操作符玩了一段时间了。

我看到了这个在短时间间隔(1s)后发出项目的小例子:

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
    Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
        return d + " " + t;
    }).toBlocking().forEach(System.out::println);

这可行,但是当我删除将源变为 BlockingObservable 的 toBlocking() 时,程序会执行并以无输出结束。

我通常看弹珠图来正确理解事物: http://reactivex.io/documentation/operators/zip.html

在最后一句中它说:它只会发射与发射最少项目的源 Observable 发射的项目数量一样多的项目。

这是否意味着data Observable 在不到 1 秒的时间内发出所有项目并在打印每个 Observable 的前两个项目之前结束?因为每个 Observable 本身都是异步的?

我需要清楚地了解正在发生的事情,以及是否有其他方法可以处理类似情况。有人吗?

【问题讨论】:

标签: java multithreading reactive-programming rx-java


【解决方案1】:

Observable.interval 在幕后使用Scheduler。它将从另一个线程发出。同时,主线程已经完成了所有的编写,将退出。大概您在 main 方法中有此代码,这就是您的程序退出的原因。

在实际系统中,这应该不是问题(除非您的实际系统是包含此代码的 main 方法)。

在一个示例程序中,您可以通过从标准输入读取一个字节来导致主线程阻塞。像这样的:

Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> d + " " + t)
        .subscribe(System.out::println);

System.in.read();

【讨论】:

  • 另外,您可能想看看非静态Observable.zipWith。更流畅。
  • 所以这与异步无关,只是计时器在不同的线程上运行,并且 zip(With) 运算符在它们都发出第一个项目之前不会发出..?
  • 是的 - zip 产生一个 Observable 在两个压缩的 Observable 都发出一些东西之前不会发出任何东西。
【解决方案2】:

基本上你的主程序在可观察对象有机会发出任何东西之前就退出了,这就是你看不到任何输出的原因。修复它的方法是以某种方式阻止,直到Observable 发出所有项目,这是使用CountDownLatch 的一种方法:

CountDownLatch latch = new CountDownLatch(1);
Observable<String> data = Observable.just("one", "two", "three", "four", "five");
Observable.zip(data, Observable.interval(1, TimeUnit.SECONDS), (d, t) -> {
    return d + " " + t;
}).finallyDo(latch::countDown).forEach(System.out::println);

latch.await(10, TimeUnit.SECONDS);

【讨论】:

  • 这很好。它与 toBlocking() 有何不同?
  • 好吧,两者都做同样的事情,在某种意义上,两者都会阻塞,直到 Observable 有机会发射,但以不同的方式做。第一种方法是调用toBlocking,您将创建一个完全不同的类型BlockingObservable,因此底层行为发生了变化。在第二种方式中,您使用 Observable 本身,但它有机会通过使用闩锁来发射项目。这纯粹是为了测试,对于一个真实的系统,你不应该使用任何一种方法
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-05-03
  • 1970-01-01
  • 2012-03-18
  • 2021-10-31
相关资源
最近更新 更多