【问题标题】:Reactor Flux and asynchronous processingReactor Flux 和异步处理
【发布时间】:2018-07-04 06:04:40
【问题描述】:

我正在尝试学习 Reactor,但我遇到了很多麻烦。我想做一个非常简单的概念证明,我模拟调用慢速流服务 1 次或更多次。如果您使用反应器并流式传输响应,则调用者不必等待所有结果。

所以我创建了一个非常简单的控制器,但它的行为不像我预期的那样。当延迟在我的 flatMap“内部”(在我调用的方法内部)时,在一切完成之前不会返回响应。但是当我在 flatMap 之后添加延迟时,数据会流式传输。

为什么这段代码会产生 JSON 流

@GetMapping(value = "/test", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
    Integer count = service.getCount(customerId);

    return Flux.range(1, count).
            flatMap(k -> service.doRestCall(k)).delayElements(Duration.ofMillis(5000));

}

但这不是

@GetMapping(value = "/test2", produces = { MediaType.APPLICATION_STREAM_JSON_VALUE })
Flux<HashMap<String, Object>> customerCards(@PathVariable String customerId) {
    Integer count = service.getCount(customerId);

    return Flux.range(1, count).
            flatMap(k -> service.doRestCallWithDelay(k));

}

它认为我缺少反应堆 API 的一些非常基本的东西。在那张纸条上。谁能指出一本关于反应堆的好书或教程?我似乎找不到什么好东西来学习这个。

谢谢

【问题讨论】:

    标签: project-reactor reactor


    【解决方案1】:

    flatMap 中的代码在主线程(即控制器运行的线程)上运行。结果整个过程被阻塞并且该方法不会立即返回。请记住,Reactor 不会强加特定的线程模型。

    相反,根据文档,在delayElements 方法signals are delayed and continue on the parallel default Scheduler 中。这意味着主线程没有被阻塞并立即返回。

    这里有两个对应的例子:

    阻塞代码:

    Flux.range(1, 500)
        .map(i -> {
                //blocking code
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " - Item : " + i);
                return i;
        })
       .subscribe();
        
        System.out.println("main completed");
    

    结果:

    main - Item : 1
    main - Item : 2
    main - Item : 3
    ...
    main - Item : 500
    main completed
    

    非阻塞代码:

    Flux.range(1, 500)
        .delayElements(Duration.ofSeconds(1))
        .subscribe(i -> {
            System.out.println(Thread.currentThread().getName() + " - Item : " + i);
        });
        
    System.out.println("main Completed");
        
    //sleep main thread in order to be able to print the println of the flux
    try {
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    结果:

    main Completed
    parallel-1 - Item : 1
    parallel-2 - Item : 2
    parallel-3 - Item : 3
    parallel-4 - Item : 4
    ...
    

    【讨论】:

      【解决方案2】:

      这里是项目反应堆reference guide “delayElements”方法仅将通量元素延迟给定的持续时间,请参阅javadoc for more details 如果您需要更多帮助,我认为您应该发布有关“service.doRestCallWithDelay(k)”和“service.doRestCall(k)”方法的详细信息。

      【讨论】:

        猜你喜欢
        • 2015-03-21
        • 2015-04-15
        • 2015-02-22
        • 2015-07-17
        • 2020-10-31
        • 2021-07-14
        • 1970-01-01
        • 1970-01-01
        • 2020-06-26
        相关资源
        最近更新 更多