【问题标题】:Buffer N values per second RxJava, Project Reactor每秒缓冲 N 个值 RxJava,Project Reactor
【发布时间】:2019-03-25 03:48:06
【问题描述】:

我有一个带有一些值的流:

Flux<Integer> stream = getStream();

我试图实现每秒 N 项的功能

stream.bufferTimeout(MAX_SIZE_TWO, _1_SECOND).subscribe(val => {
  System.out.println(val);
});

我正在尝试找到接近我预期结果的运算符。

预期结果:

time: 15:00:00, stream_next_value: 1, output: {1}
time: 15:00:00, stream_next_value: 2, output: {2}
time: 15:00:00, stream_next_value: 3, no output => buffer
time: 15:00:00, stream_next_value: 4, no output => buffer
time: 15:00:00, stream_next_value: 5, no output => buffer
time: 15:00:01, stream_no_next_value, output: {3,4}
time: 15:00:01, stream_next_value: 6, no output => buffer
time: 15:00:02, stream_no_next_value, output: {5,6}

但看起来缓冲区运算符的重载版本不支持这种行为。

如何使用缓冲区操作符实现预期行为?

【问题讨论】:

  • 想在这里提供帮助,但我错过了一些信息。你能解释一下你的流的性质和频率是什么吗?理想情况下,您能否为输入流(例如interval 运算符)提供一个最小的、可复制的示例,并获得您期望的结果?

标签: rxjs rx-java2 project-reactor


【解决方案1】:

也许你可以这样做:

Flowable<Long> stream = Flowable.generate(() -> 0L, (next, emitter) -> {
        emitter.onNext(next);
        return next + 1;
});

// Flowable<Long> stream = Flowable.interval(100, MILLISECONDS);
//                                 .onBackpressureDrop(); // to make it works otherwise get a MissingBackPressureException

stream.buffer(2)
      .zipWith(Flowable.interval(1, SECONDS), (first, second) -> first)
      .flatMap(Flowable::fromIterable)
      .subscribe(s -> LOGGER.info("received: " + s),
                 Throwable::printStackTrace);

注意 stream 必须遵守背压,否则您必须添加 onBackpressureXXX() 运算符(例如,如果流是 interval() 就会出现这种情况(参见注释代码))。 你会得到这样的输出:

14:39:59.538 | INFO  | RxComputationThreadPool-1 | received: 0
14:39:59.540 | INFO  | RxComputationThreadPool-1 | received: 1
14:40:00.527 | INFO  | RxComputationThreadPool-1 | received: 2
14:40:00.528 | INFO  | RxComputationThreadPool-1 | received: 3
14:40:01.528 | INFO  | RxComputationThreadPool-1 | received: 4
14:40:01.528 | INFO  | RxComputationThreadPool-1 | received: 5
14:40:02.528 | INFO  | RxComputationThreadPool-1 | received: 6
14:40:02.528 | INFO  | RxComputationThreadPool-1 | received: 7
14:40:03.528 | INFO  | RxComputationThreadPool-1 | received: 8
14:40:03.528 | INFO  | RxComputationThreadPool-1 | received: 9

【讨论】:

    猜你喜欢
    • 2022-01-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-09
    相关资源
    最近更新 更多