【问题标题】:Time Delay between Each Task Rxjava Android每个任务 Rxjava Android 之间的时间延迟
【发布时间】:2016-11-20 01:19:15
【问题描述】:

我必须重复安排相同的observableN 次数,每个M 之间有M 秒延迟:

O1____1sec____O2____1sec____O3____1sec____O4(完成)

注意开始和结束 observable 之间没有延迟,

Observable<Precious> result = <~> // hidden for brevity (it's just a long time consuming observable that can take suppose up to 10 seconds or more)

Observable.timer(M,TimeUnit.SECONDS).compose(x -> result).take(N).subscribe();

问题这里是result observable 正在执行昂贵的网络调用,它会在计时器到期后自行超时,还是我们必须告诉它这样做,如果是的话怎么办?

【问题讨论】:

  • 我不清楚你的问题,但你是否担心你的第一个 observable 的发射速度快于你的第二个 observable 的消耗速度?
  • 另外,compose(x -&gt; result) 会扔掉x;就好像你写了x.take(N).subscribe()。你需要重新订阅 observable 吗?超时?你看过retry吗?如果这似乎不相关,您能否用更多上下文扩展您的问题?
  • 抱歉回复晚了,@RickSanchez 我很担心你提到的同样的原因,@Tassos,我不想要它的价值,我只想要它的及时调用行为,你可以说打电话给while(N != 0){dosomething(); N--; Thread.sleep(M);}
  • @TassosBassoukos 重试将导致我猜每次调用都调用 onComplete ?
  • @Mr.Z - 不,只有一个 onComplete 将被传递到下游。

标签: android multithreading rx-java rx-android


【解决方案1】:

你可以使用 concatMap 的组合来连接 observables,使用 Delay 来延迟每一个的发射

/**
 * Another elegant solution it would be to create an observable with the list of items, and then use
 * concatMap to pass all items from the first observable to the second, then this second observable
 * can be created used delay operator afterwards.
 */
@Test
public void delayObservablesWithConcatMap() {
    Observable.from(Arrays.asList(Observable.just(1), Observable.just(2),Observable.just(3)))
            .concatMap(s -> s.delay(100, TimeUnit.MILLISECONDS))
            .subscribe(n -> System.out.println(n + " just emitted..."),
                    e -> {
                    },
                    () -> System.out.println("Everybody emitt!"));
    new TestSubscriber().awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);

}

您可以在此处查看更多延迟示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/utils/ObservableDelay.java

【讨论】:

    【解决方案2】:

    要从可观察的source 中隔开项目,以便每 N 秒发出一次,请使用以下模式:

    source.zipWith(Observable.interval(1, TimeUnit.SECONDS), (src, dummy) -> src)
    

    这里需要注意的是,如果您的源 observable 花费的时间比间隔时间长,那么这些项目就会排队。

    现在我重新阅读了您的问题和说明,我认为您需要的是:

    Observable.interval(1, TimeUnit.SECONDS)
    .switchMap(dummy -> result)
    

    这将每 1 秒取消订阅并重新订阅 result observable。仅当您的 result observable 在取消订阅时取消网络调用时,它才会起作用。

    【讨论】:

      猜你喜欢
      • 2018-09-28
      • 1970-01-01
      • 2020-12-12
      • 2022-11-29
      • 2011-09-04
      • 2011-09-08
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多