【问题标题】:How to stop expensive calculation in Spring Flux如何在 Spring Flux 中停止昂贵的计算
【发布时间】:2020-02-15 09:38:31
【问题描述】:

我使用 Spring 反应式作为服务器来进行昂贵的生成并在 Flux 中一一返回结果。如果请求被取消(例如约束和太紧),这具有停止生成的优点。我的代码如下所示:

    public Flux<Entity> generate(int nbrOfEntitiesToGenerate, Constaints constraints) {
        return Flux.range(0, nbrOfEntitiesToGenerate)
            .map(x -> Generator.expensiveGeneration(constraints)
//            .subscribeOn(Schedulers.parallel())
            ;
    }

这只是我想要的一半,我不会在取消时拨打下一个电话expensiveGeneration,但不会停止当前运行昂贵的生成,如果约束太紧,可能永远不会完成。请问我该怎么做。

另外一个问题,如果你知道的话,我怎样才能在并行中生成 x 个实体以最大限度地利用我的线程(当然不是一次启动所有代)。

提前致谢。

【问题讨论】:

  • 我是 Rx 编程新手。我不确定我是否在这里得到了你的所有想法。如果你想停止运行昂贵的生成方法,那么如果它运行很长时间,你可以使用 Timer 来停止它。如果要停止向流发出下一个值,可以在订阅对象上调用 Disposable.dispose()
  • 我不希望这样,我想继续计算/生成直到用户选择停止,当他想要停止消耗资源时
  • @bubbles 我猜是这样......
  • 我的理解是您可以使用ThreadPoolExecutor 或类似Scheduler 的东西。如果你这样做了,那么你应该能够使用普通的 java Thread 中断机制来中断 Thread
  • spring-webflux 使用 Reator。您可以在另一个事件中调用 Disposable.dispose()。例如:您创建一个将发出停止事件的发布者,然后当发出停止事件时,您通过调用 Disposable.dispose() 订阅它

标签: rx-java spring-webflux project-reactor spring-reactive


【解决方案1】:

ExecutorService 创建Scheduler 很简单,但如果要取消,则需要保存Future&lt;?&gt; 可调用对象。我将Generator 更改为保留它并包装cancel 方法,该方法在Flux 处理doOnCancel 时被调用。

public class FluxPlay {
    public static void main(String[] args) {
        new FluxPlay().run();
    }
    private void run() {
        Flux<LocalDateTime> f = generate(10);
        Disposable d = f.subscribeOn(Schedulers.single()).subscribe(System.out::println);
        try {
            Thread.sleep(4500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        d.dispose();
    }

    private Flux<LocalDateTime> generate(int nbrOfEntitiesToGenerate) {
        Generator generator = new Generator();
        return Flux.range(0, nbrOfEntitiesToGenerate)
        .map(x -> generator.expensiveGeneration())
        .doOnCancel(generator::cancel)
        .doFinally(generator::shutdown)
        .publishOn(Schedulers.fromExecutor(generator::submit));
    }
}

和:

public class Generator {
    Future<?> f;
    ExecutorService es = Executors.newSingleThreadExecutor();
    public void submit(Runnable command) {
      f = es.submit(command);
    }
    public void cancel() {
        f.cancel(true);
    }
    public void shutdown(SignalType st) {
        es.shutdown();
    }
    public LocalDateTime expensiveGeneration() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println("Interrupted");
        }
        return LocalDateTime.now();
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-12-04
    • 2016-08-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多