【发布时间】:2019-03-25 03:48:06
【问题描述】:
我有一个带有一些值的流:
Flux<Integer> stream = getStream();
我试图实现每秒 N 项的功能
stream.bufferTimeout(MAX_SIZE_TWO, _1_SECOND).subscribe(val => {
System.out.println(val);
});
我正在尝试找到接近我预期结果的运算符。
预期结果:
time: 15:00:00, stream_next_value: 1, output: {1}
time: 15:00:00, stream_next_value: 2, output: {2}
time: 15:00:00, stream_next_value: 3, no output => buffer
time: 15:00:00, stream_next_value: 4, no output => buffer
time: 15:00:00, stream_next_value: 5, no output => buffer
time: 15:00:01, stream_no_next_value, output: {3,4}
time: 15:00:01, stream_next_value: 6, no output => buffer
time: 15:00:02, stream_no_next_value, output: {5,6}
但看起来缓冲区运算符的重载版本不支持这种行为。
如何使用缓冲区操作符实现预期行为?
【问题讨论】:
-
想在这里提供帮助,但我错过了一些信息。你能解释一下你的流的性质和频率是什么吗?理想情况下,您能否为输入流(例如
interval运算符)提供一个最小的、可复制的示例,并获得您期望的结果?
标签: rxjs rx-java2 project-reactor