【发布时间】:2022-01-19 06:56:54
【问题描述】:
我有一个 gui 事件制作者。我想从中获取最新的排放,并在不同的线程上处理它。在处理排放时,我需要放弃 gui 生产者排放。处理完排放后,我想从 gui 生产者那里获取最新的排放。
我试图使用 onBackpressureLatest() 溢出策略,但队列大小是我的问题。使用默认的 256 队列大小,我得到了预期的行为,但必须处理 255 次排放,这对我来说毫无用处。将队列大小减少到 16 会留下 15 个无用的排放量。我想我会在队列大小 = 1 时实现预期的行为,但 16 是最小值。
我附上上一段中描述的代码。
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.TimeUnit;
public class App {
public static void main(String[] args) throws InterruptedException {
System.setProperty("reactor.bufferSize.small", "16");
guiEventsProducer()
.onBackpressureLatest()
.log()
.publishOn(Schedulers.single())
.subscribe(next -> {
System.out.println("Processing " + next);
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Processing " + next + " done");
});
TimeUnit.SECONDS.sleep(60);
}
private static Flux<Integer> guiEventsProducer() {
return Flux.range(0, 10000);
}
}
【问题讨论】:
标签: reactive-programming project-reactor