【问题标题】:Latest overflow strategy with size 1 or any alternatives大小为 1 或任何替代方案的最新溢出策略
【发布时间】:2022-01-19 06:56:54
【问题描述】:

我有一个 gui 事件制作者。我想从中获取最新的排放,并在不同的线程上处理它。在处理排放时,我需要放弃 gui 生产者排放。处理完排放后,我想从 gui 生产者那里获取最新的排放。

我试图使用 onBackpressureLatest() 溢出策略,但队列大小是我的问题。使用默认的 256 队列大小,我得到了预期的行为,但必须处理 255 次排放,这对我来说毫无用处。将队列大小减少到 16 会留下 15 个无用的排放量。我想我会在队列大小 = 1 时实现预期的行为,但 16 是最小值。

我附上上一段中描述的代码。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.TimeUnit;

public class App {

    public static void main(String[] args) throws InterruptedException {
        System.setProperty("reactor.bufferSize.small", "16");

        guiEventsProducer()
                .onBackpressureLatest()
                .log()
                .publishOn(Schedulers.single())
                .subscribe(next -> {
                    System.out.println("Processing " + next);
                    try {
                        TimeUnit.MILLISECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Processing " + next  + " done");
                });

        TimeUnit.SECONDS.sleep(60);
    }

    private static Flux<Integer> guiEventsProducer() {
        return Flux.range(0, 10000);
    }
}

【问题讨论】:

    标签: reactive-programming project-reactor


    【解决方案1】:
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import reactor.core.scheduler.Schedulers;
    
    import java.util.concurrent.TimeUnit;
    
    public class App {
    
        public static void main(String[] args) throws InterruptedException {
    
            Flux.range(1, 10000)
                    .log()
                    .onBackpressureLatest()
                    .log()
                    .flatMap(next -> Mono.just(next).subscribeOn(Schedulers.single()), 1, 1)
                    .subscribe(integer -> {
                        System.out.println("[" + Thread.currentThread().getName() + "] result=" + integer);
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
    
            TimeUnit.SECONDS.sleep(60);
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-15
      相关资源
      最近更新 更多