【问题标题】:Returning Mono response from subscribe of Mono.fromCallable从 Mono.fromCallable 的订阅返回 Mono 响应
【发布时间】:2022-10-01 15:30:27
【问题描述】:

我想要完成的是返回一个简单的单声道响应。 我在方法detailsHandler.fetchDetailsValue 中调用不同的后端API 由于这是一个同步阻塞调用,我按照文档中的建议将它包装在 Mono.fromCallable 中。

但是我在编译时遇到了这个错误-

error: local variables referenced from a lambda expression must be final or effectively final

实际上,在 .subscribe lambda 内部,我试图分配给在 lambda 外部声明的 Response 对象。由于我需要在订阅时分配从 fetchDetailsValue 方法返回的对象,我该如何返回这个响应对象?

如果下面有错误,请纠正我并建议如何解决这个问题。感谢任何输入。谢谢!

以下是示例代码 -

        @Override
        public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
        
        return requestDO.flatMap(
                    request -> {
            Response response = new Response();                 
            Mono<List<Object>> optionalMono = Mono.fromCallable(() -> {                     
                                return detailsHandler.fetchDetailsValue(request);
                            });
            optionalMono. subscribeOn(Schedulers.boundedElastic())
                            .subscribe(result -> { 
                                     Cat1 cat1Object = null;
                                     Cat2 cat2Object = null;
                                     for(Object obj : result)  {                                     
                                       if (obj instanceof Cat1) {
                                           cat1Object = (Cat1) obj;
                                           response.addResponseObj(cat1Object); // error: local variables referenced from a lambda expression must be final or effectively final                                                
                                       }
                                       if (obj instanceof Cat2) {
                                           cat2Object = (Cat2) obj;
                                           response.addResponseObj(cat2Object); // error: local variables referenced from a lambda expression must be final or effectively final
                                       }
                             }        
            
            });
         return Mono.just(response);
         });                     
         }

当我尝试在 subscribe 方法中声明该 Response 对象并尝试在收到值时返回时。但得到错误 - Void methods cannot return a value

下面是代码 -

   @Override
            public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
            
            return requestDO.flatMap(
                        request -> {                        
                Mono<List<Object>> optionalMono = Mono.fromCallable(() -> {                     
                                    return detailsHandler.fetchDetailsValue(request);
                                });
                optionalMono. subscribeOn(Schedulers.boundedElastic())
                                .subscribe(result -> { 
                                         Response response = new Response(); // Added this inside subscribe lambda. But now getting - Void methods cannot return a value
                                         Cat1 cat1Object = null;
                                         Cat2 cat2Object = null;
                                         for(Object obj : result)  {                                     
                                           if (obj instanceof Cat1) {
                                               cat1Object = (Cat1) obj;
                                               response.addResponseObj(cat1Object);                 
                                           }
                                           if (obj instanceof Cat2) {
                                               cat2Object = (Cat2) obj;
                                               response.addResponseObj(cat2Object); 
                                           }
                                 }        
                return Mono.just(response); // Added this inside subscribe lambda. But now getting - Void methods cannot return a value
                });
             
             });                     
             }

更新:

当我像下面这样尝试时,我遇到了错误。如果我做错了什么,请纠正。

    public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
    
        return requestDO
                .flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchDetailsValue(request)))
                .map(result -> {
                    Response response = new Response();
                    for (Object obj : result) {
                        if (obj instanceof Cat1) {
                            response.addResponseObj((Cat1) obj);
                        }
                        if (obj instanceof Cat2) {
                            response.addResponseObj((Cat2) obj);
                        }
                    }
                    return response;
                })
                .map(result1 -> {
                    Response response = resultnew;
                    requestDO.flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchAdditionalValue(request, response)))
                .map(result2 -> {
                    return result2;
                });
        }

    标签: reactive-programming spring-webflux project-reactor


    【解决方案1】:

    你不应该在你的 Reactor 管道中调用subscribe。订阅应该被认为是一个终端操作,它在未来一个未知的时间异步启动管道,并且应该只用于连接到系统的其他部分。

    你想要的是使用一个简单的同步函数将你的List&lt;Object&gt; 转换为一个新的Responsemap 运算符就是为此而设计的:

    public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
    
        return requestDO
                .flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchDetailsValue(request)))
                .map(result -> {
                    Response response = new Response();
                    for (Object obj : result) {
                        if (obj instanceof Cat1) {
                            response.addResponseObj((Cat1) obj);
                        }
                        if (obj instanceof Cat2) {
                            response.addResponseObj((Cat2) obj);
                        }
                    }
                    return response;
                });
    }
    

    更新

    对于您更新的问题,您希望同时使用请求和响应来调用另一个 Mono。您可以通过首先将地图拉到 flatMap 中,然后向其中添加另一个 flatMap 来做到这一点:

    public Mono<Response> getDetails(Mono<RequestDO> requestDO) {
    
        return requestDO
                .flatMap(request -> Mono.fromCallable(() -> detailsHandler.fetchDetailsValue(request))
                        .map(result -> {
                            Response response = new Response();
                            for (Object obj : result) {
                                if (obj instanceof Cat1) {
                                    response.addResponseObj((Cat1) obj);
                                }
                                if (obj instanceof Cat2) {
                                    response.addResponseObj((Cat2) obj);
                                }
                            }
                            return response;
                        })
                        .flatMap(response -> Mono.fromCallable(() -> detailsHandler.fetchAdditionalValue(request, response))));
    }
    

    【讨论】:

    • 如果我需要将从上述地图收到的响应与请求一起传递到另一个链(例如类似于 detailsHandler.fetchDetailsValue,我想调用另一个方法 detailsHandler.fetchAdditionalValue(request, response)。修改后的响应来自这个方法就是我要返回的,请问这个怎么做?
    • 请在我的帖子中查看我上面的更新部分。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多