【问题标题】:RxJava delay for each item of list emitted发出的每个列表项的 RxJava 延迟
【发布时间】:2015-10-22 21:38:34
【问题描述】:

我正在努力实现一些我认为在 Rx 中相当简单的东西。

我有一个项目列表,我想让每个项目延迟发出。

似乎 Rx delay() 运算符只是将所有项目的发射移动了指定的延迟,而不是每个单独的项目。

这是一些测试代码。它对列表中的项目进行分组。然后,每个组都应在发出之前应用延迟。

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

结果是:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

但我希望看到的是这样的:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

我做错了什么?

【问题讨论】:

  • 想知道为什么没有任何答案实际上回答了这个问题。为什么这不起作用,有什么问题?
  • “我正在努力实现一些我认为在 Rx 中相当简单的东西” 似乎是对每个 Rx 问题的介绍。 :)

标签: java rx-java delay


【解决方案1】:

执行此操作的最简单方法似乎是使用concatMap 并将每个项目包装在延迟的 Obserable 中。

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

打印:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

【讨论】:

  • 我更喜欢这种方法,因为我毫不怀疑我应该关闭一些东西。使用 zip+interval 我想我需要手动停止间隔发射。
  • 即使在上游速度很慢的情况下也能发挥魅力。
  • 当我们知道我们想要多少结果时,这似乎会很好。当我们不知道有多少时,有没有办法做到这一点?
  • concatMap 步骤应该适用于任何类型的 observable、bounded unbounded、sized 和 unsized。给出的示例的大小只是为了匹配问题和演示的容易程度。
  • toCompletable 已弃用。
【解决方案2】:

一种方法是使用 zip 将您的 observable 与 Interval observable 组合以延迟输出。

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

【讨论】:

  • 谢谢!我认为延迟运算符不打算以我想要的方式使用。这个解决方案有效:)
  • 如果你 source observable 以低于 50 毫秒的速度发射项目,这将不起作用。 Observable.interval() 将每 50 毫秒发出一个项目。如果您的分组列表中没有匹配的项目,zip() 运算符将缓冲这些。然后,当发出一个组时,zip 将立即将其与可观察间隔中的一个项目结合起来,并将其立即发送到您的doOnNext()
  • @kjones 同意——这对慢速生产者来说是行不通的,只会延迟一个同时有物品可用的生产者。但最初的问题是 - “我有一个项目列表,我希望延迟发出每个项目。”这听起来像from,每次发射之间都有延迟。
  • @iagreen - 你是对的。我只是假设 Observable.range(1,5) 是一个假设的可观察源。由于我所做的几乎所有事情都可以在任何随机时间产生项目,因此我也倾向于将此要求投射到其他人身上。
  • Observable messageObservable = Observable .interval(1, TimeUnit.SECONDS) .map(i -> others.get(i.intValue()) ) .take(others.size());这更简单。
【解决方案3】:

只是分享一种简单的方法来以间隔发出集合中的每个项目:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

每个项目将每 500 毫秒发出一次

【讨论】:

    【解决方案4】:

    对于 kotlin 用户,我为 'zip with interval' 方法编写了一个扩展函数

    import io.reactivex.Observable
    import io.reactivex.functions.BiFunction
    import java.util.concurrent.TimeUnit
    
    fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
        Observable.zip(
            this, 
            Observable.interval(interval, timeUnit), 
            BiFunction { item, _ -> item }
        )
    

    它的工作方式相同,但这使它可以重复使用。示例:

    Observable.range(1, 5)
        .delayEach(1, TimeUnit.SECONDS)
    

    【讨论】:

      【解决方案5】:

      我认为这正是您所需要的。看看:

      long startTime = System.currentTimeMillis();
      Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
                      .timestamp(TimeUnit.MILLISECONDS)
                      .subscribe(emitTime -> {
                          System.out.println(emitTime.time() - startTime);
                      });
      

      【讨论】:

        【解决方案6】:

        在发射的每个项目之间引入延迟很有用:

        List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
        
        Observable.fromIterable(letters)
                        .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
                                .take(1)
                                .map(second -> item))
                        .subscribe(System.out::println);
        

        https://github.com/ReactiveX/RxJava/issues/3505 的更多好选择

        【讨论】:

          【解决方案7】:

          您可以实现custom rx operator,例如MinRegularIntervalDelayOperator,然后将其与lift 函数一起使用

          Observable.range(1, 5)
              .groupBy(n -> n % 5)
              .flatMap(g -> g.toList())
              .lift(new MinRegularIntervalDelayOperator<Integer>(50L))
              .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
              }).toList().toBlocking().first();
          

          【讨论】:

            【解决方案8】:
            Observable.just("A", "B", "C", "D", "E", "F")
                .flatMap { item -> Thread.sleep(2000)
                    Observable.just( item ) }
                .subscribe { println( it ) }
            

            【讨论】:

              【解决方案9】:

              要延迟每个组,您可以更改 flatMap() 以返回延迟发出组的 Observable。

              Observable
                      .range(1, 5)
                      .groupBy(n -> n % 5)
                      .flatMap(g ->
                              Observable
                                      .timer(50, TimeUnit.MILLISECONDS)
                                      .flatMap(t -> g.toList())
                      )
                      .doOnNext(item -> {
                          System.out.println(System.currentTimeMillis() - timeNow);
                          System.out.println(item);
                          System.out.println(" ");
                      }).toList().toBlocking().first();
              

              【讨论】:

              • 不幸的是,这不起作用。打印的时间戳对于所有项目都是相同的。
              • 那是因为你 source observable range(1, 5) 几乎同时发出所有项目。如果您的源在随机时间发射项目,您会看到每个组延迟 50 毫秒。如果说您的源发出 1 个项目,然后延迟 500 毫秒,然后再发出 4 个项目,则您选择作为正确答案的答案不会做您想做的事情。在这种情况下,最后 4 项将具有几乎相同的时间戳。
              • 是的,都在 doOnNext 中同时执行
              【解决方案10】:

              一种不太干净的方法是使用 .delay(Func1) 运算符通过迭代来更改延迟。

              Observable.range(1, 5)
                          .delay(n -> n*50)
                          .groupBy(n -> n % 5)
                          .flatMap(g -> g.toList())
                          .doOnNext(item -> {
                              System.out.println(System.currentTimeMillis() - timeNow);
                              System.out.println(item);
                              System.out.println(" ");
                          }).toList().toBlocking().first();
              

              【讨论】:

                【解决方案11】:

                还有其他方法可以使用 concatMap 来实现,因为 concatMap 返回可观察的源项目。所以我们可以在那个 observable 上添加延迟。

                这是我尝试过的。

                Observable.range(1, 5)
                          .groupBy(n -> n % 5)
                          .concatMap(integerIntegerGroupedObservable ->
                          integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
                          .doOnNext(item -> {
                                    System.out.println(System.currentTimeMillis() - timeNow);
                                    System.out.println(item);
                                    System.out.println(" ");
                                }).toList().toBlocking().first(); 
                

                【讨论】:

                  【解决方案12】:

                  你可以使用

                     Observable.interval(1, TimeUnit.SECONDS)
                              .map(new Function<Long, Integer>() {
                                  @Override
                                  public Integer apply(Long aLong) throws Exception {
                                      return aLong.intValue() + 1;
                                  }
                              })
                              .startWith(0)
                              .take(listInput.size())
                              .subscribe(new Consumer<Integer>() {
                                  @Override
                                  public void accept(Integer index) throws Exception {
                                      Log.d(TAG, "---index of your list --" + index);
                                  }
                              });
                  

                  上面的这段代码不重复值(索引)。 “我确定”

                  【讨论】:

                    【解决方案13】:

                    本文建议的两种方法的 Swift 扩展。

                    连接

                    import RxSwift
                    
                    extension Observable {
                        public func delayEach(_ dueTime: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
                            return self.concatMap { Observable.just($0).delay(dueTime, scheduler: scheduler) }
                        }
                    }
                    

                    压缩

                    import RxSwift
                    
                    extension Observable {
                        public func delayEach(_ period: RxSwift.RxTimeInterval, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Element> {
                            return Observable.zip(
                                Observable<Int>.interval(period, scheduler: scheduler),
                                self
                            ) { $1 }
                        }
                    }
                    

                    用法

                    Observable.range(start: 1, count: 5)
                        .delayEach(.seconds(1), scheduler: MainScheduler.instance)
                    

                    我个人偏爱 concat 方法,因为当上游以低于延迟间隔的速率发出项目时,它也可以按预期工作。

                    是的,原来的帖子是针对 RxJava 的,但 Google 也为您提供 RxSwift 查询。

                    【讨论】:

                      【解决方案14】:

                      我想你想要这个:

                      Observable.range(1, 5)
                                  .delay(50, TimeUnit.MILLISECONDS)
                                  .groupBy(n -> n % 5)
                                  .flatMap(g -> g.toList())
                                  .doOnNext(item -> {
                                      System.out.println(System.currentTimeMillis() - timeNow);
                                      System.out.println(item);
                                      System.out.println(" ");
                                  }).toList().toBlocking().first();
                      

                      这样它会延迟进入组的数字,而不是将减少的列表延迟 5 秒。

                      【讨论】:

                      • 不幸的是,这仍然不能满足我的需要。 doOnNext 方法仍然对所有项目同时执行。
                      • 使用 zip 运算符!将你的 observable 与区间 observable 结合起来!
                      【解决方案15】:

                      您可以使用 flatMap、maxConcurrent 和 delay() 在发射的项目之间添加延迟

                      这是一个示例 - 发出 0..4 延迟

                      @Test
                      fun testEmitWithDelays() {
                          val DELAY = 500L
                          val COUNT = 5
                      
                          val latch = CountDownLatch(1)
                          val startMoment = System.currentTimeMillis()
                          var endMoment : Long = 0
                      
                          Observable
                              .range(0, COUNT)
                              .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
                              .subscribe(
                                      { println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
                                      {},
                                      {
                                          endMoment = System.currentTimeMillis()
                                          latch.countDown()
                                      })
                      
                          latch.await()
                      
                          assertTrue { endMoment - startMoment >= DELAY * COUNT }
                      }
                      
                      ... value: 0, 540
                      ... value: 1, 1042
                      ... value: 2, 1544
                      ... value: 3, 2045
                      ... value: 4, 2547
                      

                      【讨论】:

                      • maxConcurrent = 1 的 flatMap 和 concatMap 不一样吗?反正他们不会交错..
                      • @LookForAngular 当然,flatMap (..., maxConcurrent=1) 的行为类似于 concatMap。
                      【解决方案16】:

                      您应该能够通过使用Timer 运算符来实现此目的。我尝试使用delay,但无法获得所需的输出。请注意在 flatmap 运算符中完成的嵌套操作。

                          Observable.range(1,5)
                                  .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                                              .map(y -> x))
                                  // attach timestamp
                                  .timestamp()
                                  .subscribe(timedIntegers ->
                                          Log.i(TAG, "Timed String: "
                                                  + timedIntegers.value()
                                                  + " "
                                                  + timedIntegers.time()));
                      

                      【讨论】:

                        猜你喜欢
                        • 1970-01-01
                        • 2015-03-26
                        • 1970-01-01
                        • 2016-10-26
                        • 1970-01-01
                        • 1970-01-01
                        • 1970-01-01
                        • 2019-10-29
                        • 1970-01-01
                        相关资源
                        最近更新 更多