【问题标题】:Spring WebFlux consumer to sinkSpring WebFlux 消费者下沉
【发布时间】:2020-08-17 21:25:29
【问题描述】:

这是一个简单的spring boot应用:

@SpringBootApplication
@RestController
public class ReactiveApplication {

    static Flux<String> fluxString;
    static volatile Queue<String> queue = new ConcurrentLinkedQueueProxy();

    private static class ConcurrentLinkedQueueProxy extends ConcurrentLinkedQueue<String> {
        private static final long serialVersionUID = 1L;

        @Override
        public boolean add(String e) {
            synchronized (this) {
                notify();
            }
            return super.add(e);
        }

        @Override
        public String poll() {
            synchronized (this) {
                if(isEmpty()) {
                    try {
                        wait();
                    } catch (InterruptedException ex) {}
                }
            }
            return super.peek() == null ? "" : super.poll();
        }
    }

    static Consumer<String> consumer = str -> queue.add(str);

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ReactiveApplication.class, args);
    }

    static {
        for(int i = 0; i < 10; i++)
            queue.add("testData " + i + " ");
    }

    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {

        Scheduler sch = Schedulers.newParallel("parallel-sch", 1);
        List<String> list = new ArrayList<>(queue);
        queue.removeAll(queue);

        fluxString = Flux.<String>create(sink -> {
            sink.onRequest(n -> {
                for(int i = 0; i < n; i++) {
                    sink.next(queue.poll());
                }
            }).onCancel(() -> sch.dispose());
        }).log().subscribeOn(sch).mergeWith(Flux.<String>fromIterable(list));

        return fluxString;

    }

    @GetMapping("/add")
    public String add( @RequestParam String s) {
        consumer.accept(s);
        return s;
    }

}

所以基本上这个应用程序创建了一个字符串流。访问/ 将获取所有存在的字符串队列,然后合并从/add 资源添加的任何内容(忽略“安全方法必须是幂等的”事情)。

我觉得奇怪的是,当我将 public static void main(...) 移动到第 1 行时,应用程序开始出现异常,向 /add 添加新值没有任何效果。我认为一定有一些有趣的事情正在发生使应用程序行为不端。有什么解释吗?

【问题讨论】:

  • 我从未见过有人试图在一个类中编写应用程序。为什么不只是遵循指南和建议,并将您的控制器放在单独的类中。因为很明显,如果您不遵循一般准则,您会得到奇怪的行为。
  • @ThomasAndolf 我只是在玩这些东西。原来“静态变量由序列初始化”是答案,它与混合的东西无关。无论如何,如果你能帮助我,我有一个新问题。如果两个客户端订阅了/,则使用/add添加的字符串以循环方式分发。现在,如果客户端断开连接,仍然需要对 /add 进行 2 次调用才能出现在订阅者 / 端点上。这个问题线性增加。看起来sch.dispose() 什么都不做。有什么想法吗?我也通过分离关注点对其进行了测试。
  • onCancel 仅在来自调用客户端的取消信号时触发。断开连接不是取消信号,projectreactor.io/docs/core/release/reference/…
  • @ThomasAndolf 在阅读参考资料后我也有同样的疑问,但onCancel 中的快速系统输出清除了它。取消事件实际上是生成的。即使在验证之后,我也尝试了.subscribeOn(sch).doFinally(signal -&gt; {...),但没有任何好处。查看日志后我发现Schedulers.newParallel 创建的线程即使在scr.dispose() 之后也没有被终止。但我不知道如何获取该线程的句柄或操作它。我已经阅读了文档、来源和参考,但我迷路了。
  • 是否有任何理由说明您为什么需要在其单独的调度程序上使用它?将执行置于调度上的一个常见原因是阻塞调用期间的并行性。 forkjoinpool 中有一定数量的线程,我们不想占用这些线程来阻塞调用。但是,一旦您将某些内容放在调度程序上,我就会相信它“独立”

标签: java multithreading spring-boot spring-webflux project-reactor


【解决方案1】:

我最终使用了这个效果很好:

@SpringBootApplication
@RestController
public class ReactiveApplication {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(1000);
    private static Consumer<String> consumer = str -> {
        try { queue.put(str); }
        catch (InterruptedException e) {}
    };
    static {
        for (int i = 0; i < 10; i++) queue.add("testData " + i + " ");
    }

    public static void main(String[] args) {
        SpringApplication.run(ReactiveApplication.class, args);
    }

    @GetMapping(value = "/", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<String> home() {

        final Scheduler sch = Schedulers.newSingle("async-flux");

        return Flux.<String>generate(sink -> {
            try { sink.next(queue.take()); }
            catch (InterruptedException e) { }
        }).log().subscribeOn(sch);

    }

    @GetMapping("/add")
    public String add(@RequestParam String s) {
        consumer.accept(s);
        return s;
    }

}

【讨论】:

    猜你喜欢
    • 2019-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-26
    • 1970-01-01
    • 2020-08-11
    • 2019-11-21
    • 1970-01-01
    相关资源
    最近更新 更多