【问题标题】:perform operation after each delay is complete in RxJava在 RxJava 中每个延迟完成后执行操作
【发布时间】:2022-02-02 08:29:51
【问题描述】:

我正在尝试在以特定顺序发射项目时模拟延迟

这里我试图模拟问题

List<Integer> integers = new ArrayList<>();
        integers.add(1);
        integers.add(2);
        integers.add(3);
        integers.add(4);
      

Disposable d = Observable
                    .just(integers)
                    .flatMap(integers1 -> {
                        return Observable
                                .zip(Observable.just(1L).concatWith(Observable.interval(10, 5, TimeUnit.SECONDS)),
                                     Observable.fromIterable(integers1), (aLong, integer1) -> {
                                            return new Pair<Long, Integer>(aLong, integer1);
                                        })
                                .flatMap(longIntegerPair -> {
                                    System.out.println("DATA " + longIntegerPair.getValue());
                                    return Observable.just(longIntegerPair.getValue());

                                })
                                .toList()
                                .toObservable();


                    })
                    .subscribe(integers1 -> {
                        System.out.println("END");
                    }, throwable -> {
                        System.out.println("Error " + throwable.getMessage());

                    });

上面代码的输出是

DATA 1 
wait for 10 seconds
DATA 2
wait for 5
DATA 3
wait for 5
DATA 4
wait for 5 min

我期望的是在每个阶段结束 10 秒或 5 秒的延迟后执行操作,但确定我应该在电流中的何处注入该部分。

DATA 1 
wait for 10 seconds
[perform operation]

DATA 2
wait for 5
[perform operation]

DATA 3
wait for 5
[perform operation]

DATA 4
wait for 5 min
[perform operation]

【问题讨论】:

  • 您可以使用重复运算符。 See this 更多问题。
  • 有很多答案不知道你到底指的是什么
  • 在打印数据的地方执行操作怎么样?
  • @akarnokd 该操作必须在数据 1 之后执行,因为这是一个模拟,我实际上是在打印数据 1 的地方执行一些操作

标签: java android rx-java rx-java2 rx-java3


【解决方案1】:

使用concatMap 使序列有序并使用delay 延迟数据处理以获得您的打印输出模式:

Observable
.zip(Observable.just(-1L).concatWith(Observable.interval(10, 5, TimeUnit.SECONDS)),
     Observable.range(1, 5), 
     (aLong, integer1) -> {
          return new Pair<Long, Integer>(aLong, integer1);
     }
)
.concatMap(longIntegerPair -> {
       System.out.println("DATA " + longIntegerPair.getValue());
       return Observable.just(longIntegerPair.getValue())
               .delay(longIntegerPair.getKey() < 0 ? 10 : 5, TimeUnit.SECONDS)
               .flatMap(value -> {
                   System.out.println("[performing operation] " + value);
                   return Observable.just(value);
              });
})
.toList()
.toObservable()
.blockingSubscribe();

【讨论】:

    猜你喜欢
    • 2013-02-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-05-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多